diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..27246bf --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1 @@ +2cf0e060ed126537dd993896b6aa793e2a6b9e80 diff --git a/.github/workflows/build-and-release.yml b/.github/workflows/build-and-release.yml index 1f60c59..9cd962f 100644 --- a/.github/workflows/build-and-release.yml +++ b/.github/workflows/build-and-release.yml @@ -14,23 +14,17 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-13, macos-latest] - python: ["3.9"] + python: ["3.11"] steps: - uses: actions/checkout@v4 - - uses: actions/setup-python@v5 + - name: Install uv + uses: astral-sh/setup-uv@v7 with: python-version: ${{ matrix.python }} - name: Set up project - if: matrix.os != 'windows-latest' - run: ./scripts/setup - - - name: Set up project on windows - if: matrix.os == 'windows-latest' - # For some reason, `pip install --upgrade pip` doesn't work on - # 'windows-latest'. The installed pip version works fine however. - run: ./scripts/setup --no-pip + run: uv sync - name: Run checks run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 573cad9..2a2848c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,82 @@ ambiguous situations. ## Unreleased +## Added +- Store the description when using the `internet-shortcut` link format +- Support for basic auth with the kit-ipd crawler + +## Fixed +- Event loop errors on Windows with Python 3.14 +- Sanitize `/` in headings in kit-ipd crawler +- Crawl info tab again + +## 3.8.3 - 2025-07-01 + +## Added +- Support for link collections. + In "fancy" mode, a single HTML file with multiple links is generated. + In all other modes, PFERD creates a folder for the collection and a new file + for every link inside. + +## Fixed +- Crawling of exercises with instructions +- Don't download unavailable elements. + Elements that are unavailable (for example, because their availability is + time restricted) will not download the HTML for the info page anymore. +- `base_url` argument for `ilias-web` crawler causing crashes + +## 3.8.2 - 2025-04-29 + +## Changed +- Explicitly mention that wikis are not supported at the moment and ignore them + +## Fixed +- Ilias-native login +- Exercise crawling + +## 3.8.1 - 2025-04-17 + +## Fixed +- Description html files now specify at UTF-8 encoding +- Images in descriptions now always have a white background + +## 3.8.0 - 2025-04-16 + +### Added +- Support for ILIAS 9 + +### Changed +- Added prettier CSS to forum threads +- Downloaded forum threads now link to the forum instead of the ILIAS thread +- Increase minimum supported Python version to 3.11 +- Do not crawl nested courses (courses linked in other courses) + +## Fixed +- File links in report on Windows +- TOTP authentication in KIT Shibboleth +- Forum crawling only considering the first 20 entries + +## 3.7.0 - 2024-11-13 + +### Added +- Support for MOB videos in page descriptions +- Clickable links in the report to directly open new/modified/not-deleted files +- Support for non KIT shibboleth login + +### Changed +- Remove videos from description pages +- Perform ILIAS cycle detection after processing the transform to allow + ignoring duplicated elements +- Parse headings (h1-h3) as folders in kit-ipd crawler + +### Fixed +- Personal desktop/dashboard/favorites crawling +- Crawling of nested courses +- Downloading of links with no target URL +- Handle row flex on description pages +- Add `` heading to forum threads to fix mime type detection +- Handle groups in cards + ## 3.6.0 - 2024-10-23 ### Added diff --git a/CONFIG.md b/CONFIG.md index a52506d..b87f75c 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -153,6 +153,7 @@ requests is likely a good idea. - `link_regex`: A regex that is matched against the `href` part of links. If it matches, the given link is downloaded as a file. This is used to extract files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`) +- `auth`: Name of auth section to use for basic authentication. (Optional) ### The `ilias-web` crawler @@ -163,12 +164,15 @@ out of the box for the corresponding universities: [ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs" -| University | `base_url` | `client_id` | -|---------------|--------------------------------------|---------------| -| FH Aachen | https://www.ili.fh-aachen.de | elearning | -| Uni Köln | https://www.ilias.uni-koeln.de/ilias | uk | -| Uni Konstanz | https://ilias.uni-konstanz.de | ILIASKONSTANZ | -| Uni Stuttgart | https://ilias3.uni-stuttgart.de | Uni_Stuttgart | +| University | `base_url` | `login_type` | `client_id` | +|-----------------|-----------------------------------------|--------------|---------------| +| FH Aachen | https://www.ili.fh-aachen.de | local | elearning | +| HHU Düsseldorf | https://ilias.hhu.de | local | UniRZ | +| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk | +| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ | +| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart | +| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | | +| KIT ILIAS Pilot | https://pilot.ilias.studium.kit.edu | shibboleth | pilot | If your university isn't listed, try navigating to your instance's login page. Assuming no custom login service is used, the URL will look something like this: @@ -180,7 +184,11 @@ Assuming no custom login service is used, the URL will look something like this: If the values work, feel free to submit a PR and add them to the table above. - `base_url`: The URL where the ILIAS instance is located. (Required) -- `client_id`: An ID used for authentication. (Required) +- `login_type`: How you authenticate. (Required) + - `local`: Use `client_id` for authentication. + - `shibboleth`: Use shibboleth for authentication. +- `client_id`: An ID used for authentication if `login_type` is `local`. Is + ignored if `login_type` is `shibboleth`. - `target`: The ILIAS element to crawl. (Required) - `desktop`: Crawl your personal desktop / dashboard - ``: Crawl the course with the given id @@ -191,6 +199,8 @@ If the values work, feel free to submit a PR and add them to the table above. and duplication warnings if you are a member of an ILIAS group. The `desktop` target is generally preferable. - `auth`: Name of auth section to use for login. (Required) +- `tfa_auth`: Name of auth section to use for two-factor authentication. Only + uses the auth section's password. (Default: Anonymous `tfa` authenticator) - `links`: How to represent external links. (Default: `fancy`) - `ignore`: Don't download links. - `plaintext`: A text file containing only the URL. diff --git a/DEV.md b/DEV.md index f577b93..8cc42c2 100644 --- a/DEV.md +++ b/DEV.md @@ -9,30 +9,25 @@ particular [this][ppug-1] and [this][ppug-2] guide). ## Setting up a dev environment -The use of [venv][venv] is recommended. To initially set up a development -environment, run these commands in the same directory as this file: +The use of [venv][venv] and [uv][uv] is recommended. To initially set up a +development environment, run these commands in the same directory as this file: ``` -$ python -m venv .venv +$ uv sync $ . .venv/bin/activate -$ ./scripts/setup ``` -The setup script installs a few required dependencies and tools. It also -installs PFERD via `pip install --editable .`, which means that you can just run -`pferd` as if it was installed normally. Since PFERD was installed with -`--editable`, there is no need to re-run `pip install` when the source code is -changed. - -If you get any errors because pip can't update itself, try running -`./scripts/setup --no-pip` instead of `./scripts/setup`. +This install all required dependencies and tools. It also installs PFERD as +*editable*, which means that you can just run `pferd` as if it was installed +normally. Since PFERD was installed with `--editable`, there is no need to +re-run `uv sync` when the source code is changed. For more details, see [this part of the Python Tutorial][venv-tut] and [this section on "development mode"][ppug-dev]. [venv]: "venv - Creation of virtual environments" [venv-tut]: "12. Virtual Environments and Packages" -[ppug-dev]: "Working in “development mode”" +[uv]: "uv - An extremely fast Python package and project manager" ## Checking and formatting the code diff --git a/LICENSE b/LICENSE index 13fa307..ccccbe3 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim, thelukasprobst, Toorero, - Mr-Pine, p-fruck + Mr-Pine, p-fruck, PinieP Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/PFERD/__main__.py b/PFERD/__main__.py index cb8c67c..2de9dbc 100644 --- a/PFERD/__main__.py +++ b/PFERD/__main__.py @@ -133,7 +133,8 @@ def main() -> None: # https://bugs.python.org/issue39232 # https://github.com/encode/httpx/issues/914#issuecomment-780023632 # TODO Fix this properly - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) loop.run_until_complete(pferd.run(args.debug_transforms)) loop.run_until_complete(asyncio.sleep(1)) loop.close() diff --git a/PFERD/auth/__init__.py b/PFERD/auth/__init__.py index aa3ba8e..7295c7a 100644 --- a/PFERD/auth/__init__.py +++ b/PFERD/auth/__init__.py @@ -1,5 +1,5 @@ +from collections.abc import Callable from configparser import SectionProxy -from typing import Callable, Dict from ..config import Config from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401 @@ -9,21 +9,19 @@ from .pass_ import PassAuthenticator, PassAuthSection from .simple import SimpleAuthenticator, SimpleAuthSection from .tfa import TfaAuthenticator -AuthConstructor = Callable[[ - str, # Name (without the "auth:" prefix) - SectionProxy, # Authenticator's section of global config - Config, # Global config -], Authenticator] +AuthConstructor = Callable[ + [ + str, # Name (without the "auth:" prefix) + SectionProxy, # Authenticator's section of global config + Config, # Global config + ], + Authenticator, +] -AUTHENTICATORS: Dict[str, AuthConstructor] = { - "credential-file": lambda n, s, c: - CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), - "keyring": lambda n, s, c: - KeyringAuthenticator(n, KeyringAuthSection(s)), - "pass": lambda n, s, c: - PassAuthenticator(n, PassAuthSection(s)), - "simple": lambda n, s, c: - SimpleAuthenticator(n, SimpleAuthSection(s)), - "tfa": lambda n, s, c: - TfaAuthenticator(n), +AUTHENTICATORS: dict[str, AuthConstructor] = { + "credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), + "keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)), + "pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)), + "simple": lambda n, s, c: SimpleAuthenticator(n, SimpleAuthSection(s)), + "tfa": lambda n, s, c: TfaAuthenticator(n), } diff --git a/PFERD/auth/authenticator.py b/PFERD/auth/authenticator.py index 643a2d5..417b7ba 100644 --- a/PFERD/auth/authenticator.py +++ b/PFERD/auth/authenticator.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from typing import Tuple from ..config import Section @@ -35,7 +34,7 @@ class Authenticator(ABC): self.name = name @abstractmethod - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: pass async def username(self) -> str: diff --git a/PFERD/auth/credential_file.py b/PFERD/auth/credential_file.py index 94ffa73..cb7834c 100644 --- a/PFERD/auth/credential_file.py +++ b/PFERD/auth/credential_file.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import Tuple from ..config import Config from ..utils import fmt_real_path @@ -23,7 +22,9 @@ class CredentialFileAuthenticator(Authenticator): with open(path, encoding="utf-8") as f: lines = list(f) except UnicodeDecodeError: - raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8") + raise AuthLoadError( + f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8" + ) from None except OSError as e: raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e @@ -42,5 +43,5 @@ class CredentialFileAuthenticator(Authenticator): self._username = uline[9:] self._password = pline[9:] - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: return self._username, self._password diff --git a/PFERD/auth/keyring.py b/PFERD/auth/keyring.py index c14f6fb..414640a 100644 --- a/PFERD/auth/keyring.py +++ b/PFERD/auth/keyring.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Optional import keyring @@ -17,7 +17,6 @@ class KeyringAuthSection(AuthSection): class KeyringAuthenticator(Authenticator): - def __init__(self, name: str, section: KeyringAuthSection) -> None: super().__init__(name) @@ -28,7 +27,7 @@ class KeyringAuthenticator(Authenticator): self._password_invalidated = False self._username_fixed = section.username() is not None - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: # Request the username if self._username is None: async with log.exclusive_output(): diff --git a/PFERD/auth/pass_.py b/PFERD/auth/pass_.py index 4c8e775..c5d9b24 100644 --- a/PFERD/auth/pass_.py +++ b/PFERD/auth/pass_.py @@ -1,6 +1,5 @@ import re import subprocess -from typing import List, Tuple from ..logging import log from .authenticator import Authenticator, AuthError, AuthSection @@ -12,11 +11,11 @@ class PassAuthSection(AuthSection): self.missing_value("passname") return value - def username_prefixes(self) -> List[str]: + def username_prefixes(self) -> list[str]: value = self.s.get("username_prefixes", "login,username,user") return [prefix.lower() for prefix in value.split(",")] - def password_prefixes(self) -> List[str]: + def password_prefixes(self) -> list[str]: value = self.s.get("password_prefixes", "password,pass,secret") return [prefix.lower() for prefix in value.split(",")] @@ -31,14 +30,14 @@ class PassAuthenticator(Authenticator): self._username_prefixes = section.username_prefixes() self._password_prefixes = section.password_prefixes() - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: log.explain_topic("Obtaining credentials from pass") try: log.explain(f"Calling 'pass show {self._passname}'") result = subprocess.check_output(["pass", "show", self._passname], text=True) except subprocess.CalledProcessError as e: - raise AuthError(f"Failed to get password info from {self._passname}: {e}") + raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e prefixed = {} unprefixed = [] diff --git a/PFERD/auth/simple.py b/PFERD/auth/simple.py index 831c12f..dea4b67 100644 --- a/PFERD/auth/simple.py +++ b/PFERD/auth/simple.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Optional from ..logging import log from ..utils import agetpass, ainput @@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator): self._username_fixed = self.username is not None self._password_fixed = self.password is not None - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: if self._username is not None and self._password is not None: return self._username, self._password diff --git a/PFERD/auth/tfa.py b/PFERD/auth/tfa.py index 26b1383..6ae48fe 100644 --- a/PFERD/auth/tfa.py +++ b/PFERD/auth/tfa.py @@ -1,5 +1,3 @@ -from typing import Tuple - from ..logging import log from ..utils import ainput from .authenticator import Authenticator, AuthError @@ -17,7 +15,7 @@ class TfaAuthenticator(Authenticator): code = await ainput("TFA code: ") return code - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: raise AuthError("TFA authenticator does not support usernames") def invalidate_username(self) -> None: diff --git a/PFERD/cli/command_ilias_web.py b/PFERD/cli/command_ilias_web.py index 77a1657..b68e48f 100644 --- a/PFERD/cli/command_ilias_web.py +++ b/PFERD/cli/command_ilias_web.py @@ -21,23 +21,20 @@ GROUP.add_argument( "--base-url", type=str, metavar="BASE_URL", - help="The base url of the ilias instance" + help="The base url of the ilias instance", ) GROUP.add_argument( "--client-id", type=str, metavar="CLIENT_ID", - help="The client id of the ilias instance" + help="The client id of the ilias instance", ) configure_common_group_args(GROUP) -def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, -) -> None: +def load(args: argparse.Namespace, parser: configparser.ConfigParser) -> None: log.explain(f"Creating config for command '{COMMAND_NAME}'") parser["crawl:ilias"] = {} @@ -45,8 +42,8 @@ def load( load_crawler(args, section) section["type"] = COMMAND_NAME - if args.ilias_url is not None: - section["base_url"] = args.ilias_url + if args.base_url is not None: + section["base_url"] = args.base_url if args.client_id is not None: section["client_id"] = args.client_id diff --git a/PFERD/cli/command_kit_ilias_web.py b/PFERD/cli/command_kit_ilias_web.py index 10797c2..b3b45c5 100644 --- a/PFERD/cli/command_kit_ilias_web.py +++ b/PFERD/cli/command_kit_ilias_web.py @@ -21,8 +21,8 @@ configure_common_group_args(GROUP) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain(f"Creating config for command '{COMMAND_NAME}'") diff --git a/PFERD/cli/command_kit_ipd.py b/PFERD/cli/command_kit_ipd.py index b53e67e..a80af03 100644 --- a/PFERD/cli/command_kit_ipd.py +++ b/PFERD/cli/command_kit_ipd.py @@ -18,25 +18,30 @@ GROUP.add_argument( "--link-regex", type=str, metavar="REGEX", - help="href-matching regex to identify downloadable files" + help="href-matching regex to identify downloadable files", +) +GROUP.add_argument( + "--basic-auth", + action="store_true", + help="enable basic authentication", ) GROUP.add_argument( "target", type=str, metavar="TARGET", - help="url to crawl" + help="url to crawl", ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory" + help="output directory", ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'kit-ipd'") @@ -50,5 +55,11 @@ def load( if args.link_regex: section["link_regex"] = str(args.link_regex) + if args.basic_auth: + section["auth"] = "auth:kit-ipd" + parser["auth:kit-ipd"] = {} + auth_section = parser["auth:kit-ipd"] + auth_section["type"] = "simple" + SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_local.py b/PFERD/cli/command_local.py index 309c42f..6016afa 100644 --- a/PFERD/cli/command_local.py +++ b/PFERD/cli/command_local.py @@ -18,37 +18,37 @@ GROUP.add_argument( "target", type=Path, metavar="TARGET", - help="directory to crawl" + help="directory to crawl", ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory" + help="output directory", ) GROUP.add_argument( "--crawl-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for crawl requests" + help="artificial delay to simulate for crawl requests", ) GROUP.add_argument( "--download-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for download requests" + help="artificial delay to simulate for download requests", ) GROUP.add_argument( "--download-speed", type=int, metavar="BYTES_PER_SECOND", - help="download speed to simulate" + help="download speed to simulate", ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'local'") diff --git a/PFERD/cli/common_ilias_args.py b/PFERD/cli/common_ilias_args.py index bbbbee5..edad6da 100644 --- a/PFERD/cli/common_ilias_args.py +++ b/PFERD/cli/common_ilias_args.py @@ -12,58 +12,60 @@ def configure_common_group_args(group: argparse._ArgumentGroup) -> None: "target", type=str, metavar="TARGET", - help="course id, 'desktop', or ILIAS URL to crawl" + help="course id, 'desktop', or ILIAS URL to crawl", ) group.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory" + help="output directory", ) group.add_argument( - "--username", "-u", + "--username", + "-u", type=str, metavar="USERNAME", - help="user name for authentication" + help="user name for authentication", ) group.add_argument( "--keyring", action=BooleanOptionalAction, - help="use the system keyring to store and retrieve passwords" + help="use the system keyring to store and retrieve passwords", ) group.add_argument( "--credential-file", type=Path, metavar="PATH", - help="read username and password from a credential file" + help="read username and password from a credential file", ) group.add_argument( "--links", type=show_value_error(Links.from_string), metavar="OPTION", - help="how to represent external links" + help="how to represent external links", ) group.add_argument( "--link-redirect-delay", type=int, metavar="SECONDS", - help="time before 'fancy' links redirect to to their target (-1 to disable)" + help="time before 'fancy' links redirect to to their target (-1 to disable)", ) group.add_argument( "--videos", action=BooleanOptionalAction, - help="crawl and download videos" + help="crawl and download videos", ) group.add_argument( "--forums", action=BooleanOptionalAction, - help="crawl and download forum posts" + help="crawl and download forum posts", ) group.add_argument( - "--http-timeout", "-t", + "--http-timeout", + "-t", type=float, metavar="SECONDS", - help="timeout for all HTTP requests" + help="timeout for all HTTP requests", ) diff --git a/PFERD/cli/parser.py b/PFERD/cli/parser.py index be483fd..c9bec13 100644 --- a/PFERD/cli/parser.py +++ b/PFERD/cli/parser.py @@ -1,8 +1,9 @@ import argparse import configparser from argparse import ArgumentTypeError +from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Callable, List, Optional, Sequence, Union +from typing import Any, Optional from ..output_dir import OnConflict, Redownload from ..version import NAME, VERSION @@ -15,15 +16,15 @@ class ParserLoadError(Exception): # TODO Replace with argparse version when updating to 3.9? class BooleanOptionalAction(argparse.Action): def __init__( - self, - option_strings: List[str], - dest: Any, - default: Any = None, - type: Any = None, - choices: Any = None, - required: Any = False, - help: Any = None, - metavar: Any = None, + self, + option_strings: list[str], + dest: Any, + default: Any = None, + type: Any = None, + choices: Any = None, + required: Any = False, + help: Any = None, + metavar: Any = None, ): if len(option_strings) != 1: raise ValueError("There must be exactly one option string") @@ -48,11 +49,11 @@ class BooleanOptionalAction(argparse.Action): ) def __call__( - self, - parser: argparse.ArgumentParser, - namespace: argparse.Namespace, - values: Union[str, Sequence[Any], None], - option_string: Optional[str] = None, + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: str | Sequence[Any] | None, + option_string: Optional[str] = None, ) -> None: if option_string and option_string in self.option_strings: value = not option_string.startswith("--no-") @@ -67,11 +68,13 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]: Some validation functions (like the from_string in our enums) raise a ValueError. Argparse only pretty-prints ArgumentTypeErrors though, so we need to wrap our ValueErrors. """ + def wrapper(input: str) -> Any: try: return inner(input) except ValueError as e: - raise ArgumentTypeError(e) + raise ArgumentTypeError(e) from e + return wrapper @@ -81,52 +84,57 @@ CRAWLER_PARSER_GROUP = CRAWLER_PARSER.add_argument_group( description="arguments common to all crawlers", ) CRAWLER_PARSER_GROUP.add_argument( - "--redownload", "-r", + "--redownload", + "-r", type=show_value_error(Redownload.from_string), metavar="OPTION", - help="when to download a file that's already present locally" + help="when to download a file that's already present locally", ) CRAWLER_PARSER_GROUP.add_argument( "--on-conflict", type=show_value_error(OnConflict.from_string), metavar="OPTION", - help="what to do when local and remote files or directories differ" + help="what to do when local and remote files or directories differ", ) CRAWLER_PARSER_GROUP.add_argument( - "--transform", "-T", + "--transform", + "-T", action="append", type=str, metavar="RULE", - help="add a single transformation rule. Can be specified multiple times" + help="add a single transformation rule. Can be specified multiple times", ) CRAWLER_PARSER_GROUP.add_argument( - "--tasks", "-n", + "--tasks", + "-n", type=int, metavar="N", - help="maximum number of concurrent tasks (crawling, downloading)" + help="maximum number of concurrent tasks (crawling, downloading)", ) CRAWLER_PARSER_GROUP.add_argument( - "--downloads", "-N", + "--downloads", + "-N", type=int, metavar="N", - help="maximum number of tasks that may download data at the same time" + help="maximum number of tasks that may download data at the same time", ) CRAWLER_PARSER_GROUP.add_argument( - "--task-delay", "-d", + "--task-delay", + "-d", type=float, metavar="SECONDS", - help="time the crawler should wait between subsequent tasks" + help="time the crawler should wait between subsequent tasks", ) CRAWLER_PARSER_GROUP.add_argument( "--windows-paths", action=BooleanOptionalAction, - help="whether to repair invalid paths on windows" + help="whether to repair invalid paths on windows", ) def load_crawler( - args: argparse.Namespace, - section: configparser.SectionProxy, + args: argparse.Namespace, + section: configparser.SectionProxy, ) -> None: if args.redownload is not None: section["redownload"] = args.redownload.value @@ -152,79 +160,79 @@ PARSER.add_argument( version=f"{NAME} {VERSION} (https://github.com/Garmelon/PFERD)", ) PARSER.add_argument( - "--config", "-c", + "--config", + "-c", type=Path, metavar="PATH", - help="custom config file" + help="custom config file", ) PARSER.add_argument( "--dump-config", action="store_true", - help="dump current configuration to the default config path and exit" + help="dump current configuration to the default config path and exit", ) PARSER.add_argument( "--dump-config-to", metavar="PATH", - help="dump current configuration to a file and exit." - " Use '-' as path to print to stdout instead" + help="dump current configuration to a file and exit. Use '-' as path to print to stdout instead", ) PARSER.add_argument( "--debug-transforms", action="store_true", - help="apply transform rules to files of previous run" + help="apply transform rules to files of previous run", ) PARSER.add_argument( - "--crawler", "-C", + "--crawler", + "-C", action="append", type=str, metavar="NAME", - help="only execute a single crawler." - " Can be specified multiple times to execute multiple crawlers" + help="only execute a single crawler. Can be specified multiple times to execute multiple crawlers", ) PARSER.add_argument( - "--skip", "-S", + "--skip", + "-S", action="append", type=str, metavar="NAME", - help="don't execute this particular crawler." - " Can be specified multiple times to skip multiple crawlers" + help="don't execute this particular crawler. Can be specified multiple times to skip multiple crawlers", ) PARSER.add_argument( "--working-dir", type=Path, metavar="PATH", - help="custom working directory" + help="custom working directory", ) PARSER.add_argument( "--explain", action=BooleanOptionalAction, - help="log and explain in detail what PFERD is doing" + help="log and explain in detail what PFERD is doing", ) PARSER.add_argument( "--status", action=BooleanOptionalAction, - help="print status updates while PFERD is crawling" + help="print status updates while PFERD is crawling", ) PARSER.add_argument( "--report", action=BooleanOptionalAction, - help="print a report of all local changes before exiting" + help="print a report of all local changes before exiting", ) PARSER.add_argument( "--share-cookies", action=BooleanOptionalAction, - help="whether crawlers should share cookies where applicable" + help="whether crawlers should share cookies where applicable", ) PARSER.add_argument( "--show-not-deleted", action=BooleanOptionalAction, - help="print messages in status and report when PFERD did not delete a local only file" + help="print messages in status and report when PFERD did not delete a local only file", ) def load_default_section( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: section = parser[parser.default_section] diff --git a/PFERD/config.py b/PFERD/config.py index b2cff4e..7da2889 100644 --- a/PFERD/config.py +++ b/PFERD/config.py @@ -3,7 +3,7 @@ import os import sys from configparser import ConfigParser, SectionProxy from pathlib import Path -from typing import Any, List, NoReturn, Optional, Tuple +from typing import Any, NoReturn, Optional from rich.markup import escape @@ -53,10 +53,10 @@ class Section: raise ConfigOptionError(self.s.name, key, desc) def invalid_value( - self, - key: str, - value: Any, - reason: Optional[str], + self, + key: str, + value: Any, + reason: Optional[str], ) -> NoReturn: if reason is None: self.error(key, f"Invalid value {value!r}") @@ -126,13 +126,13 @@ class Config: with open(path, encoding="utf-8") as f: parser.read_file(f, source=str(path)) except FileNotFoundError: - raise ConfigLoadError(path, "File does not exist") + raise ConfigLoadError(path, "File does not exist") from None except IsADirectoryError: - raise ConfigLoadError(path, "That's a directory, not a file") + raise ConfigLoadError(path, "That's a directory, not a file") from None except PermissionError: - raise ConfigLoadError(path, "Insufficient permissions") + raise ConfigLoadError(path, "Insufficient permissions") from None except UnicodeDecodeError: - raise ConfigLoadError(path, "File is not encoded using UTF-8") + raise ConfigLoadError(path, "File is not encoded using UTF-8") from None def dump(self, path: Optional[Path] = None) -> None: """ @@ -150,8 +150,8 @@ class Config: try: path.parent.mkdir(parents=True, exist_ok=True) - except PermissionError: - raise ConfigDumpError(path, "Could not create parent directory") + except PermissionError as e: + raise ConfigDumpError(path, "Could not create parent directory") from e try: # Ensuring we don't accidentally overwrite any existing files by @@ -167,16 +167,16 @@ class Config: with open(path, "w", encoding="utf-8") as f: self._parser.write(f) else: - raise ConfigDumpError(path, "File already exists") + raise ConfigDumpError(path, "File already exists") from None except IsADirectoryError: - raise ConfigDumpError(path, "That's a directory, not a file") - except PermissionError: - raise ConfigDumpError(path, "Insufficient permissions") + raise ConfigDumpError(path, "That's a directory, not a file") from None + except PermissionError as e: + raise ConfigDumpError(path, "Insufficient permissions") from e def dump_to_stdout(self) -> None: self._parser.write(sys.stdout) - def crawl_sections(self) -> List[Tuple[str, SectionProxy]]: + def crawl_sections(self) -> list[tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("crawl:"): @@ -184,7 +184,7 @@ class Config: return result - def auth_sections(self) -> List[Tuple[str, SectionProxy]]: + def auth_sections(self) -> list[tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("auth:"): diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 9a0e080..9ba6a37 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -1,5 +1,5 @@ +from collections.abc import Callable from configparser import SectionProxy -from typing import Callable, Dict from ..auth import Authenticator from ..config import Config @@ -8,20 +8,19 @@ from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection -CrawlerConstructor = Callable[[ - str, # Name (without the "crawl:" prefix) - SectionProxy, # Crawler's section of global config - Config, # Global config - Dict[str, Authenticator], # Loaded authenticators by name -], Crawler] +CrawlerConstructor = Callable[ + [ + str, # Name (without the "crawl:" prefix) + SectionProxy, # Crawler's section of global config + Config, # Global config + dict[str, Authenticator], # Loaded authenticators by name + ], + Crawler, +] -CRAWLERS: Dict[str, CrawlerConstructor] = { - "local": lambda n, s, c, a: - LocalCrawler(n, LocalCrawlerSection(s), c), - "ilias-web": lambda n, s, c, a: - IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), - "kit-ilias-web": lambda n, s, c, a: - KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), - "kit-ipd": lambda n, s, c, a: - KitIpdCrawler(n, KitIpdCrawlerSection(s), c), +CRAWLERS: dict[str, CrawlerConstructor] = { + "local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c), + "ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), + "kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), + "kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c, a), } diff --git a/PFERD/crawl/crawler.py b/PFERD/crawl/crawler.py index 0e67c02..e2cdf30 100644 --- a/PFERD/crawl/crawler.py +++ b/PFERD/crawl/crawler.py @@ -1,10 +1,10 @@ import asyncio import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Coroutine +from collections.abc import Awaitable, Callable, Coroutine, Sequence from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar +from typing import Any, Optional, TypeVar from ..auth import Authenticator from ..config import Config, Section @@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]): return bar -class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): +class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): super().__init__() @@ -128,12 +128,13 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): def path(self) -> PurePath: return self._path - async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: + async def _on_aenter(self) -> tuple[ProgressBar, FileSink]: await self._stack.enter_async_context(self._limiter.limit_download()) sink = await self._stack.enter_async_context(self._fs_token) # The "Downloaded ..." message is printed in the output dir, not here - bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading", - fmt_path(self._path))) + bar = self._stack.enter_context( + log.download_bar("[bold bright_cyan]", "Downloading", fmt_path(self._path)) + ) return bar, sink @@ -149,9 +150,7 @@ class CrawlerSection(Section): return self.s.getboolean("skip", fallback=False) def output_dir(self, name: str) -> Path: - # TODO Use removeprefix() after switching to 3.9 - if name.startswith("crawl:"): - name = name[len("crawl:"):] + name = name.removeprefix("crawl:") return Path(self.s.get("output_dir", name)).expanduser() def redownload(self) -> Redownload: @@ -206,7 +205,7 @@ class CrawlerSection(Section): on_windows = os.name == "nt" return self.s.getboolean("windows_paths", fallback=on_windows) - def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: + def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator: value = self.s.get("auth") if value is None: self.missing_value("auth") @@ -218,10 +217,10 @@ class CrawlerSection(Section): class Crawler(ABC): def __init__( - self, - name: str, - section: CrawlerSection, - config: Config, + self, + name: str, + section: CrawlerSection, + config: Config, ) -> None: """ Initialize a crawler from its name and its section in the config file. @@ -258,8 +257,12 @@ class Crawler(ABC): def prev_report(self) -> Optional[Report]: return self._output_dir.prev_report + @property + def output_dir(self) -> OutputDirectory: + return self._output_dir + @staticmethod - async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: + async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]: """ Similar to asyncio.gather. However, in the case of an exception, all still running tasks are cancelled and the exception is rethrown. @@ -290,12 +293,39 @@ class Crawler(ABC): log.explain("Answer: Yes") return CrawlToken(self._limiter, path) + def should_try_download( + self, + path: PurePath, + *, + etag_differs: Optional[bool] = None, + mtime: Optional[datetime] = None, + redownload: Optional[Redownload] = None, + on_conflict: Optional[OnConflict] = None, + ) -> bool: + log.explain_topic(f"Decision: Should Download {fmt_path(path)}") + + if self._transformer.transform(path) is None: + log.explain("Answer: No (ignored)") + return False + + should_download = self._output_dir.should_try_download( + path, etag_differs=etag_differs, mtime=mtime, redownload=redownload, on_conflict=on_conflict + ) + if should_download: + log.explain("Answer: Yes") + return True + else: + log.explain("Answer: No") + return False + async def download( - self, - path: PurePath, - mtime: Optional[datetime] = None, - redownload: Optional[Redownload] = None, - on_conflict: Optional[OnConflict] = None, + self, + path: PurePath, + *, + etag_differs: Optional[bool] = None, + mtime: Optional[datetime] = None, + redownload: Optional[Redownload] = None, + on_conflict: Optional[OnConflict] = None, ) -> Optional[DownloadToken]: log.explain_topic(f"Decision: Download {fmt_path(path)}") path = self._deduplicator.mark(path) @@ -307,7 +337,14 @@ class Crawler(ABC): log.status("[bold bright_black]", "Ignored", fmt_path(path)) return None - fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict) + fs_token = await self._output_dir.download( + path, + transformed_path, + etag_differs=etag_differs, + mtime=mtime, + redownload=redownload, + on_conflict=on_conflict, + ) if fs_token is None: log.explain("Answer: No") return None @@ -357,7 +394,7 @@ class Crawler(ABC): log.warn("Couldn't find or load old report") return - seen: Set[PurePath] = set() + seen: set[PurePath] = set() for known in sorted(self.prev_report.found_paths): looking_at = list(reversed(known.parents)) + [known] for path in looking_at: diff --git a/PFERD/crawl/http_crawler.py b/PFERD/crawl/http_crawler.py index 44ec4dd..49d6013 100644 --- a/PFERD/crawl/http_crawler.py +++ b/PFERD/crawl/http_crawler.py @@ -1,35 +1,39 @@ import asyncio import http.cookies import ssl +from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Dict, List, Optional +from typing import Any, Optional import aiohttp import certifi from aiohttp.client import ClientTimeout +from bs4 import Tag from ..auth import Authenticator from ..config import Config from ..logging import log -from ..utils import fmt_real_path +from ..utils import fmt_real_path, sanitize_path_name from ..version import NAME, VERSION from .crawler import Crawler, CrawlerSection +ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags" + class HttpCrawlerSection(CrawlerSection): def http_timeout(self) -> float: - return self.s.getfloat("http_timeout", fallback=20) + return self.s.getfloat("http_timeout", fallback=30) class HttpCrawler(Crawler): COOKIE_FILE = PurePath(".cookies") def __init__( - self, - name: str, - section: HttpCrawlerSection, - config: Config, - shared_auth: Optional[Authenticator] = None, + self, + name: str, + section: HttpCrawlerSection, + config: Config, + shared_auth: Optional[Authenticator] = None, ) -> None: super().__init__(name, section, config) @@ -39,7 +43,7 @@ class HttpCrawler(Crawler): self._http_timeout = section.http_timeout() self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE) - self._shared_cookie_jar_paths: Optional[List[Path]] = None + self._shared_cookie_jar_paths: Optional[list[Path]] = None self._shared_auth = shared_auth self._output_dir.register_reserved(self.COOKIE_FILE) @@ -94,7 +98,7 @@ class HttpCrawler(Crawler): """ raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation") - def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None: + def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None: if not self._shared_auth: return @@ -169,24 +173,102 @@ class HttpCrawler(Crawler): log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(str(e)) + @staticmethod + def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath: + """ + Retrieves the hierarchy of headings associated with the give file link and constructs a folder + structure from them. + +

level headings usually only appear once and serve as the page title, so they would introduce + redundant nesting. To avoid this,

headings are ignored via the drop_h1 parameter. + """ + + def find_associated_headings(tag: Tag, level: int) -> PurePath: + if level == 0 or (level == 1 and drop_h1): + return PurePath() + + level_heading = tag.find_previous(name=f"h{level}") + + if level_heading is None: + return find_associated_headings(tag, level - 1) + + folder_name = sanitize_path_name(level_heading.get_text().strip()) + return find_associated_headings(level_heading, level - 1) / folder_name + + # start at level

because paragraph-level headings are usually too granular for folder names + return find_associated_headings(file_link, 3) + + def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]: + """ + If available, retrieves the entity tag for a given path which was stored in the previous report. + """ + if not self._output_dir.prev_report: + return None + + etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {} + return etags.get(str(path)) + + def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None: + """ + Adds an entity tag for a given path to the report's custom values. + """ + if not etag: + return + + etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {} + etags[str(path)] = etag + self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags) + + async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]: + """ + Requests the ETag and Last-Modified headers of a resource via a HEAD request. + If no entity tag / modification date can be obtained, the according value will be None. + """ + try: + async with self.session.head(resource_url) as resp: + if resp.status != 200: + return None, None + + etag_header = resp.headers.get("ETag") + last_modified_header = resp.headers.get("Last-Modified") + last_modified = None + + if last_modified_header: + try: + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives + datetime_format = "%a, %d %b %Y %H:%M:%S GMT" + last_modified = datetime.strptime(last_modified_header, datetime_format) + except ValueError: + # last_modified remains None + pass + + return etag_header, last_modified + except aiohttp.ClientError: + return None, None + async def run(self) -> None: self._request_count = 0 self._cookie_jar = aiohttp.CookieJar() self._load_cookies() async with aiohttp.ClientSession( - headers={"User-Agent": f"{NAME}/{VERSION}"}, - cookie_jar=self._cookie_jar, - connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), - timeout=ClientTimeout( - # 30 minutes. No download in the history of downloads was longer than 30 minutes. - # This is enough to transfer a 600 MB file over a 3 Mib/s connection. - # Allowing an arbitrary value could be annoying for overnight batch jobs - total=15 * 60, - connect=self._http_timeout, - sock_connect=self._http_timeout, - sock_read=self._http_timeout, - ) + headers={"User-Agent": f"{NAME}/{VERSION}"}, + cookie_jar=self._cookie_jar, + connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), + timeout=ClientTimeout( + # 30 minutes. No download in the history of downloads was longer than 30 minutes. + # This is enough to transfer a 600 MB file over a 3 Mib/s connection. + # Allowing an arbitrary value could be annoying for overnight batch jobs + total=15 * 60, + connect=self._http_timeout, + sock_connect=self._http_timeout, + sock_read=self._http_timeout, + ), + # See https://github.com/aio-libs/aiohttp/issues/6626 + # Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the + # passed signature. Shibboleth will not accept the broken signature and authentication will + # fail. + requote_redirect_url=False, ) as session: self.session = session try: diff --git a/PFERD/crawl/ilias/__init__.py b/PFERD/crawl/ilias/__init__.py index 287bd3d..fa1aaed 100644 --- a/PFERD/crawl/ilias/__init__.py +++ b/PFERD/crawl/ilias/__init__.py @@ -1,5 +1,9 @@ -from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, - KitIliasWebCrawlerSection) +from .kit_ilias_web_crawler import ( + IliasWebCrawler, + IliasWebCrawlerSection, + KitIliasWebCrawler, + KitIliasWebCrawlerSection, +) __all__ = [ "IliasWebCrawler", diff --git a/PFERD/crawl/ilias/async_helper.py b/PFERD/crawl/ilias/async_helper.py index 527a819..2e6b301 100644 --- a/PFERD/crawl/ilias/async_helper.py +++ b/PFERD/crawl/ilias/async_helper.py @@ -1,5 +1,6 @@ import asyncio -from typing import Any, Callable, Optional +from collections.abc import Callable +from typing import Any, Optional import aiohttp @@ -15,9 +16,9 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla try: return await f(*args, **kwargs) except aiohttp.ContentTypeError: # invalid content type - raise CrawlWarning("ILIAS returned an invalid content type") + raise CrawlWarning("ILIAS returned an invalid content type") from None except aiohttp.TooManyRedirects: - raise CrawlWarning("Got stuck in a redirect loop") + raise CrawlWarning("Got stuck in a redirect loop") from None except aiohttp.ClientPayloadError as e: # encoding or not enough bytes last_exception = e except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc. @@ -25,9 +26,10 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler last_exception = e log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}") + log.explain(f"Last exception: {last_exception!r}") if last_exception: - message = f"Error in I/O Operation: {last_exception}" + message = f"Error in I/O Operation: {last_exception!r}" if failure_is_error: raise CrawlError(message) from last_exception else: diff --git a/PFERD/crawl/ilias/file_templates.py b/PFERD/crawl/ilias/file_templates.py index b206461..c832977 100644 --- a/PFERD/crawl/ilias/file_templates.py +++ b/PFERD/crawl/ilias/file_templates.py @@ -1,5 +1,7 @@ +import dataclasses +import re from enum import Enum -from typing import Optional +from typing import Optional, cast import bs4 @@ -12,7 +14,9 @@ _link_template_fancy = """ ILIAS - Link: {{name}} + + -
- -
-
- {{name}} +
+ +
+ -
{{description}}
+
+
+ {{name}} +
+
{{description}}
+
+
- +
@@ -96,6 +111,7 @@ _link_template_fancy = """ _link_template_internet_shortcut = """ [InternetShortcut] URL={{link}} +Desc={{description}} """.strip() _learning_module_template = """ @@ -126,6 +142,88 @@ _learning_module_template = """ """ +_forum_thread_template = """ + + + + + ILIAS - Forum: {{name}} + + + + {{heading}} + {{content}} + + +""".strip() # noqa: E501 line too long + def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str: # Seems to be comments, ignore those. @@ -139,13 +237,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
""" if prev and body.select_one(".ilc_page_lnav_LeftNavigation"): - text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip() + text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip() left = f'{text}' else: left = "" if next and body.select_one(".ilc_page_rnav_RightNavigation"): - text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip() + text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip() right = f'{text}' else: right = "" @@ -156,12 +254,29 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next ) if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"): - bot_nav.replace_with(soupify(nav_template.replace( - "{{left}}", left).replace("{{right}}", right).encode()) + bot_nav.replace_with( + soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode()) ) - body = body.prettify() - return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name) + body_str = body.prettify() + return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name) + + +def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str: + if title := heading.find(name="b"): + title.wrap(bs4.Tag(name="a", attrs={"href": url})) + return ( + _forum_thread_template.replace("{{name}}", name) + .replace("{{heading}}", heading.prettify()) + .replace("{{content}}", content.prettify()) + ) + + +@dataclasses.dataclass +class LinkData: + name: str + url: str + description: str class Links(Enum): @@ -181,6 +296,9 @@ class Links(Enum): return None raise ValueError("Missing switch case") + def collection_as_one(self) -> bool: + return self == Links.FANCY + def extension(self) -> Optional[str]: if self == Links.FANCY: return ".html" @@ -192,10 +310,47 @@ class Links(Enum): return None raise ValueError("Missing switch case") + def interpolate(self, redirect_delay: int, collection_name: str, links: list[LinkData]) -> str: + template = self.template() + if template is None: + raise ValueError("Cannot interpolate ignored links") + + if len(links) == 1: + link = links[0] + content = template + content = content.replace("{{link}}", link.url) + content = content.replace("{{name}}", link.name) + content = content.replace("{{description}}", link.description) + content = content.replace("{{redirect_delay}}", str(redirect_delay)) + return content + if self == Links.PLAINTEXT or self == Links.INTERNET_SHORTCUT: + return "\n".join(f"{link.url}" for link in links) + + # All others get coerced to fancy + content = cast(str, Links.FANCY.template()) + repeated_content = cast( + re.Match[str], re.search(r"([\s\S]+)", content) + ).group(1) + + parts = [] + for link in links: + instance = repeated_content + instance = instance.replace("{{link}}", link.url) + instance = instance.replace("{{name}}", link.name) + instance = instance.replace("{{description}}", link.description) + instance = instance.replace("{{redirect_delay}}", str(redirect_delay)) + parts.append(instance) + + content = content.replace(repeated_content, "\n".join(parts)) + content = content.replace("{{name}}", collection_name) + content = re.sub(r"[\s\S]+", "", content) + + return content + @staticmethod def from_string(string: str) -> "Links": try: return Links(string) except ValueError: - raise ValueError("must be one of 'ignore', 'plaintext'," - " 'html', 'internet-shortcut'") + options = [f"'{option.value}'" for option in Links] + raise ValueError(f"must be one of {', '.join(options)}") from None diff --git a/PFERD/crawl/ilias/ilias_html_cleaner.py b/PFERD/crawl/ilias/ilias_html_cleaner.py index 5495304..35a7ea0 100644 --- a/PFERD/crawl/ilias/ilias_html_cleaner.py +++ b/PFERD/crawl/ilias/ilias_html_cleaner.py @@ -1,3 +1,5 @@ +from typing import cast + from bs4 import BeautifulSoup, Comment, Tag _STYLE_TAG_CONTENT = """ @@ -12,6 +14,13 @@ _STYLE_TAG_CONTENT = """ font-weight: bold; } + .row-flex { + display: flex; + } + .row-flex-wrap { + flex-wrap: wrap; + } + .accordion-head { background-color: #f5f7fa; padding: 0.5rem 0; @@ -30,6 +39,10 @@ _STYLE_TAG_CONTENT = """ margin: 0.5rem 0; } + img { + background-color: white; + } + body { padding: 1em; grid-template-columns: 1fr min(60rem, 90%) 1fr; @@ -47,12 +60,11 @@ _ARTICLE_WORTHY_CLASSES = [ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: head = soup.new_tag("head") soup.insert(0, head) + # Force UTF-8 encoding + head.append(soup.new_tag("meta", charset="utf-8")) - simplecss_link: Tag = soup.new_tag("link") # - simplecss_link["rel"] = "stylesheet" - simplecss_link["href"] = "https://cdn.simplecss.org/simple.css" - head.append(simplecss_link) + head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css")) # Basic style tags for compat style: Tag = soup.new_tag("style") @@ -63,18 +75,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: def clean(soup: BeautifulSoup) -> BeautifulSoup: - for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES): + for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)): block.name = "article" - for block in soup.find_all("h3"): + for block in cast(list[Tag], soup.find_all("h3")): block.name = "div" - for block in soup.find_all("h1"): + for block in cast(list[Tag], soup.find_all("h1")): block.name = "h3" - for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"): + for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")): block.name = "h3" - block["class"] += ["accordion-head"] + block["class"] += ["accordion-head"] # type: ignore for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"): children = list(dummy.children) @@ -85,7 +97,12 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup: if isinstance(type(children[0]), Comment): dummy.decompose() - for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): + # Delete video figures, as they can not be internalized anyway + for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"): + if figure := video.find_parent("figure"): + figure.decompose() + + for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")): hrule_imposter.insert(0, soup.new_tag("hr")) return soup diff --git a/PFERD/crawl/ilias/ilias_web_crawler.py b/PFERD/crawl/ilias/ilias_web_crawler.py index b77f4fc..b5041b3 100644 --- a/PFERD/crawl/ilias/ilias_web_crawler.py +++ b/PFERD/crawl/ilias/ilias_web_crawler.py @@ -4,7 +4,7 @@ import os import re from collections.abc import Awaitable, Coroutine from pathlib import PurePath -from typing import Any, Dict, List, Literal, Optional, Set, Union, cast +from typing import Any, Literal, Optional, cast from urllib.parse import urljoin import aiohttp @@ -15,16 +15,29 @@ from ...auth import Authenticator from ...config import Config from ...logging import ProgressBar, log from ...output_dir import FileSink, Redownload -from ...utils import fmt_path, soupify, url_set_query_param +from ...utils import fmt_path, sanitize_path_name, soupify, url_set_query_param from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical from ..http_crawler import HttpCrawler, HttpCrawlerSection from .async_helper import _iorepeat -from .file_templates import Links, learning_module_template +from .file_templates import LinkData, Links, forum_thread_template, learning_module_template from .ilias_html_cleaner import clean, insert_base_markup -from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, - IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) +from .kit_ilias_html import ( + IliasElementType, + IliasForumThread, + IliasLearningModulePage, + IliasPage, + IliasPageElement, + IliasSoup, + parse_ilias_forum_export, +) +from .shibboleth_login import ShibbolethLogin -TargetType = Union[str, int] +TargetType = str | int + + +class LoginTypeLocal: + def __init__(self, client_id: str): + self.client_id = client_id class IliasWebCrawlerSection(HttpCrawlerSection): @@ -35,12 +48,28 @@ class IliasWebCrawlerSection(HttpCrawlerSection): return base_url - def client_id(self) -> str: - client_id = self.s.get("client_id") - if not client_id: - self.missing_value("client_id") + def login(self) -> Literal["shibboleth"] | LoginTypeLocal: + login_type = self.s.get("login_type") + if not login_type: + self.missing_value("login_type") + if login_type == "shibboleth": + return "shibboleth" + if login_type == "local": + client_id = self.s.get("client_id") + if not client_id: + self.missing_value("client_id") + return LoginTypeLocal(client_id) - return client_id + self.invalid_value("login_type", login_type, "Should be ") + + def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]: + value: Optional[str] = self.s.get("tfa_auth") + if value is None: + return None + auth = authenticators.get(value) + if auth is None: + self.invalid_value("tfa_auth", value, "No such auth section exists") + return auth def target(self) -> TargetType: target = self.s.get("target") @@ -80,24 +109,25 @@ class IliasWebCrawlerSection(HttpCrawlerSection): return self.s.getboolean("forums", fallback=False) -_DIRECTORY_PAGES: Set[IliasElementType] = { +_DIRECTORY_PAGES: set[IliasElementType] = { IliasElementType.EXERCISE, IliasElementType.EXERCISE_FILES, + IliasElementType.EXERCISE_OVERVIEW, IliasElementType.FOLDER, IliasElementType.INFO_TAB, - IliasElementType.MEETING, IliasElementType.MEDIACAST_VIDEO_FOLDER, + IliasElementType.MEETING, IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, } -_VIDEO_ELEMENTS: Set[IliasElementType] = { - IliasElementType.MEDIACAST_VIDEO_FOLDER, +_VIDEO_ELEMENTS: set[IliasElementType] = { IliasElementType.MEDIACAST_VIDEO, + IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO, - IliasElementType.OPENCAST_VIDEO_PLAYER, IliasElementType.OPENCAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, + IliasElementType.OPENCAST_VIDEO_PLAYER, } @@ -141,28 +171,36 @@ class IliasWebCrawler(HttpCrawler): name: str, section: IliasWebCrawlerSection, config: Config, - authenticators: Dict[str, Authenticator] + authenticators: dict[str, Authenticator], ): # Setting a main authenticator for cookie sharing auth = section.auth(authenticators) super().__init__(name, section, config, shared_auth=auth) if section.tasks() > 1: - log.warn(""" + log.warn( + """ Please avoid using too many parallel requests as these are the KIT ILIAS instance's greatest bottleneck. - """.strip()) + """.strip() + ) self._auth = auth self._base_url = section.base_url() - self._client_id = section.client_id() + self._tfa_auth = section.tfa_auth(authenticators) + + self._login_type = section.login() + if isinstance(self._login_type, LoginTypeLocal): + self._client_id = self._login_type.client_id + else: + self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth) self._target = section.target() self._link_file_redirect_delay = section.link_redirect_delay() self._links = section.links() self._videos = section.videos() self._forums = section.forums() - self._visited_urls: Dict[str, PurePath] = dict() + self._visited_urls: dict[str, PurePath] = dict() async def _run(self) -> None: if isinstance(self._target, int): @@ -178,118 +216,89 @@ instance's greatest bottleneck. async def _crawl_course(self, course_id: int) -> None: # Start crawling at the given course root_url = url_set_query_param( - urljoin(self._base_url, "/goto.php"), - "target", f"crs_{course_id}", + urljoin(self._base_url + "/", "goto.php"), + "target", + f"crs_{course_id}", ) await self._crawl_url(root_url, expected_id=course_id) async def _crawl_desktop(self) -> None: - appendix = r"ILIAS\Repository\Provider\RepositoryMainBarProvider|mm_pd_sel_items" - appendix = appendix.encode("ASCII").hex() - await self._crawl_url(url_set_query_param( - urljoin(self._base_url, "/gs_content.php"), - "item=", appendix, - )) + await self._crawl_url( + urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), crawl_nested_courses=True + ) - async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: - maybe_cl = await self.crawl(PurePath(".")) - if not maybe_cl: - return - cl = maybe_cl # Not mypy's fault, but explained here: https://github.com/python/mypy/issues/2608 - - elements: List[IliasPageElement] = [] - # A list as variable redefinitions are not propagated to outer scopes - description: List[BeautifulSoup] = [] - - @_iorepeat(3, "crawling url") - async def gather_elements() -> None: - elements.clear() - async with cl: - next_stage_url: Optional[str] = url - current_parent = None - - # Duplicated code, but the root page is special - we want to avoid fetching it twice! - while next_stage_url: - soup = await self._get_page(next_stage_url, root_page_allowed=True) - - if current_parent is None and expected_id is not None: - perma_link = IliasPage.get_soup_permalink(soup) - if not perma_link or "crs_" not in perma_link: - raise CrawlError("Invalid course id? Didn't find anything looking like a course") - - log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") - log.explain(f"URL: {next_stage_url}") - page = IliasPage(soup, next_stage_url, current_parent) - if next_element := page.get_next_stage_element(): - current_parent = next_element - next_stage_url = next_element.url - else: - next_stage_url = None - - elements.extend(page.get_child_elements()) - if info_tab := page.get_info_tab(): - elements.append(info_tab) - if description_string := page.get_description(): - description.append(description_string) - - # Fill up our task list with the found elements - await gather_elements() - - if description: - await self._download_description(PurePath("."), description[0]) - - elements.sort(key=lambda e: e.id()) - - tasks: List[Awaitable[None]] = [] - for element in elements: - if handle := await self._handle_ilias_element(PurePath("."), element): - tasks.append(asyncio.create_task(handle)) - - # And execute them - await self.gather(tasks) + async def _crawl_url( + self, url: str, expected_id: Optional[int] = None, crawl_nested_courses: bool = False + ) -> None: + if awaitable := await self._handle_ilias_page( + url, None, PurePath("."), expected_id, crawl_nested_courses + ): + await awaitable async def _handle_ilias_page( self, url: str, - parent: IliasPageElement, + current_element: Optional[IliasPageElement], path: PurePath, + expected_course_id: Optional[int] = None, + crawl_nested_courses: bool = False, ) -> Optional[Coroutine[Any, Any, None]]: maybe_cl = await self.crawl(path) if not maybe_cl: return None - return self._crawl_ilias_page(url, parent, maybe_cl) + if current_element: + self._ensure_not_seen(current_element, path) + + return self._crawl_ilias_page( + url, current_element, maybe_cl, expected_course_id, crawl_nested_courses + ) @anoncritical async def _crawl_ilias_page( self, url: str, - parent: IliasPageElement, + current_element: Optional[IliasPageElement], cl: CrawlToken, + expected_course_id: Optional[int] = None, + crawl_nested_courses: bool = False, ) -> None: - elements: List[IliasPageElement] = [] + elements: list[IliasPageElement] = [] # A list as variable redefinitions are not propagated to outer scopes - description: List[BeautifulSoup] = [] + description: list[BeautifulSoup] = [] @_iorepeat(3, "crawling folder") async def gather_elements() -> None: elements.clear() async with cl: next_stage_url: Optional[str] = url - current_parent = parent + current_parent = current_element + page = None while next_stage_url: soup = await self._get_page(next_stage_url) log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") log.explain(f"URL: {next_stage_url}") - page = IliasPage(soup, next_stage_url, current_parent) + + # If we expect to find a root course, enforce it + if current_parent is None and expected_course_id is not None: + perma_link = IliasPage.get_soup_permalink(soup) + if not perma_link or "crs/" not in perma_link: + raise CrawlError("Invalid course id? Didn't find anything looking like a course") + if str(expected_course_id) not in perma_link: + raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}") + + page = IliasPage(soup, current_parent) if next_element := page.get_next_stage_element(): current_parent = next_element next_stage_url = next_element.url else: next_stage_url = None + page = cast(IliasPage, page) elements.extend(page.get_child_elements()) + if current_element is None and (info_tab := page.get_info_tab()): + elements.append(info_tab) if description_string := page.get_description(): description.append(description_string) @@ -301,9 +310,9 @@ instance's greatest bottleneck. elements.sort(key=lambda e: e.id()) - tasks: List[Awaitable[None]] = [] + tasks: list[Awaitable[None]] = [] for element in elements: - if handle := await self._handle_ilias_element(cl.path, element): + if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses): tasks.append(asyncio.create_task(handle)) # And execute them @@ -316,32 +325,30 @@ instance's greatest bottleneck. # works correctly. @anoncritical async def _handle_ilias_element( - self, - parent_path: PurePath, - element: IliasPageElement, + self, parent_path: PurePath, element: IliasPageElement, crawl_nested_courses: bool = False ) -> Optional[Coroutine[Any, Any, None]]: - if element.url in self._visited_urls: - raise CrawlWarning( - f"Found second path to element {element.name!r} at {element.url!r}. " - + f"First path: {fmt_path(self._visited_urls[element.url])}. " - + f"Second path: {fmt_path(parent_path)}." - ) - self._visited_urls[element.url] = parent_path - # element.name might contain `/` if the crawler created nested elements, # so we can not sanitize it here. We trust in the output dir to thwart worst-case # directory escape attacks. element_path = PurePath(parent_path, element.name) - if element.type in _VIDEO_ELEMENTS: - if not self._videos: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](enable with option 'videos')" - ) - return None + # This is symptomatic of no access to the element, for example, because + # of time availability restrictions. + if "cmdClass=ilInfoScreenGUI" in element.url and "cmd=showSummary" in element.url: + log.explain( + "Skipping element as url points to info screen, " + "this should only happen with not-yet-released elements" + ) + return None + + if element.type in _VIDEO_ELEMENTS and not self._videos: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](enable with option 'videos')", + ) + return None if element.type == IliasElementType.FILE: return await self._handle_file(element, element_path) @@ -351,7 +358,7 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](enable with option 'forums')" + "[bright_black](enable with option 'forums')", ) return None return await self._handle_forum(element, element_path) @@ -360,7 +367,7 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](tests contain no relevant data)" + "[bright_black](tests contain no relevant data)", ) return None elif element.type == IliasElementType.SURVEY: @@ -368,7 +375,7 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](surveys contain no relevant data)" + "[bright_black](surveys contain no relevant data)", ) return None elif element.type == IliasElementType.SCORM_LEARNING_MODULE: @@ -376,13 +383,73 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](scorm learning modules are not supported)" + "[bright_black](scorm learning modules are not supported)", + ) + return None + elif element.type == IliasElementType.LITERATURE_LIST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](literature lists are not currently supported)", + ) + return None + elif element.type == IliasElementType.LEARNING_MODULE_HTML: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](HTML learning modules are not supported)", + ) + return None + elif element.type == IliasElementType.BLOG: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](blogs are not currently supported)", + ) + return None + elif element.type == IliasElementType.DCL_RECORD_LIST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](dcl record lists are not currently supported)", + ) + return None + elif element.type == IliasElementType.MEDIA_POOL: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](media pools are not currently supported)", + ) + return None + elif element.type == IliasElementType.COURSE: + if crawl_nested_courses: + return await self._handle_ilias_page(element.url, element, element_path) + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](not descending into linked course)", + ) + return None + elif element.type == IliasElementType.WIKI: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](wikis are not currently supported)", ) return None elif element.type == IliasElementType.LEARNING_MODULE: return await self._handle_learning_module(element, element_path) elif element.type == IliasElementType.LINK: return await self._handle_link(element, element_path) + elif element.type == IliasElementType.LINK_COLLECTION: + return await self._handle_link(element, element_path) elif element.type == IliasElementType.BOOKING: return await self._handle_booking(element, element_path) elif element.type == IliasElementType.OPENCAST_VIDEO: @@ -391,6 +458,8 @@ instance's greatest bottleneck. return await self._handle_opencast_video(element, element_path) elif element.type == IliasElementType.MEDIACAST_VIDEO: return await self._handle_file(element, element_path) + elif element.type == IliasElementType.MOB_VIDEO: + return await self._handle_file(element, element_path, is_video=True) elif element.type in _DIRECTORY_PAGES: return await self._handle_ilias_page(element.url, element, element_path) else: @@ -406,44 +475,93 @@ instance's greatest bottleneck. log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}") log.explain(f"Links type is {self._links}") - link_template_maybe = self._links.template() - link_extension = self._links.extension() - if not link_template_maybe or not link_extension: + export_url = url_set_query_param(element.url, "cmd", "exportHTML") + resolved = await self._resolve_link_target(export_url) + if resolved == "none": + links = [LinkData(element.name, "", element.description or "")] + else: + links = self._parse_link_content(element, cast(BeautifulSoup, resolved)) + + maybe_extension = self._links.extension() + + if not maybe_extension: log.explain("Answer: No") return None else: log.explain("Answer: Yes") - element_path = element_path.with_name(element_path.name + link_extension) - maybe_dl = await self.download(element_path, mtime=element.mtime) - if not maybe_dl: + if len(links) <= 1 or self._links.collection_as_one(): + element_path = element_path.with_name(element_path.name + maybe_extension) + maybe_dl = await self.download(element_path, mtime=element.mtime) + if not maybe_dl: + return None + return self._download_link(self._links, element.name, links, maybe_dl) + + maybe_cl = await self.crawl(element_path) + if not maybe_cl: return None + # Required for download_all closure + cl = maybe_cl + extension = maybe_extension - return self._download_link(element, link_template_maybe, maybe_dl) + async def download_all() -> None: + for link in links: + path = cl.path / (sanitize_path_name(link.name) + extension) + if dl := await self.download(path, mtime=element.mtime): + await self._download_link(self._links, element.name, [link], dl) + + return download_all() @anoncritical @_iorepeat(3, "resolving link") - async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None: - async with dl as (bar, sink): - export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML") - real_url = await self._resolve_link_target(export_url) - self._write_link_content(link_template, real_url, element.name, element.description, sink) - - def _write_link_content( - self, - link_template: str, - url: str, - name: str, - description: Optional[str], - sink: FileSink, + async def _download_link( + self, link_renderer: Links, collection_name: str, links: list[LinkData], dl: DownloadToken ) -> None: - content = link_template - content = content.replace("{{link}}", url) - content = content.replace("{{name}}", name) - content = content.replace("{{description}}", str(description)) - content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay)) - sink.file.write(content.encode("utf-8")) - sink.done() + async with dl as (bar, sink): + rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links) + sink.file.write(rendered.encode("utf-8")) + sink.done() + + async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]: + async def impl() -> Optional[BeautifulSoup | Literal["none"]]: + async with self.session.get(export_url, allow_redirects=False) as resp: + # No redirect means we were authenticated + if hdrs.LOCATION not in resp.headers: + return soupify(await resp.read()) # .select_one("a").get("href").strip() # type: ignore + # We are either unauthenticated or the link is not active + new_url = resp.headers[hdrs.LOCATION].lower() + if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url: + return "none" + return None + + auth_id = await self._current_auth_id() + target = await impl() + if target is not None: + return target + + await self.authenticate(auth_id) + + target = await impl() + if target is not None: + return target + + raise CrawlError("resolve_link_target failed even after authenticating") + + @staticmethod + def _parse_link_content(element: IliasPageElement, content: BeautifulSoup) -> list[LinkData]: + links = list(content.select("a")) + if len(links) == 1: + url = str(links[0].get("href")).strip() + return [LinkData(name=element.name, description=element.description or "", url=url)] + + results = [] + for link in links: + url = str(link.get("href")).strip() + name = link.get_text(strip=True) + description = cast(Tag, link.find_next_sibling("dd")).get_text(strip=True) + results.append(LinkData(name=name, description=description, url=url.strip())) + + return results async def _handle_booking( self, @@ -466,7 +584,9 @@ instance's greatest bottleneck. if not maybe_dl: return None - return self._download_booking(element, link_template_maybe, maybe_dl) + self._ensure_not_seen(element, element_path) + + return self._download_booking(element, maybe_dl) @anoncritical @_iorepeat(1, "downloading description") @@ -476,9 +596,10 @@ instance's greatest bottleneck. if not dl: return - async with dl as (bar, sink): + async with dl as (_bar, sink): description = clean(insert_base_markup(description)) - sink.file.write(description.prettify().encode("utf-8")) + description_tag = await self.internalize_images(description) + sink.file.write(description_tag.prettify().encode("utf-8")) sink.done() @anoncritical @@ -486,26 +607,13 @@ instance's greatest bottleneck. async def _download_booking( self, element: IliasPageElement, - link_template: str, dl: DownloadToken, ) -> None: async with dl as (bar, sink): - self._write_link_content(link_template, element.url, element.name, element.description, sink) - - async def _resolve_link_target(self, export_url: str) -> str: - async with self.session.get(export_url, allow_redirects=False) as resp: - # No redirect means we were authenticated - if hdrs.LOCATION not in resp.headers: - return soupify(await resp.read()).select_one("a").get("href").strip() - - await self._authenticate() - - async with self.session.get(export_url, allow_redirects=False) as resp: - # No redirect means we were authenticated - if hdrs.LOCATION not in resp.headers: - return soupify(await resp.read()).select_one("a").get("href").strip() - - raise CrawlError("resolve_link_target failed even after authenticating") + links = [LinkData(name=element.name, description=element.description or "", url=element.url)] + rendered = self._links.interpolate(self._link_file_redirect_delay, element.name, links) + sink.file.write(rendered.encode("utf-8")) + sink.done() async def _handle_opencast_video( self, @@ -516,7 +624,7 @@ instance's greatest bottleneck. if self.prev_report: self.report.add_custom_value( _get_video_cache_key(element), - self.prev_report.get_custom_value(_get_video_cache_key(element)) + self.prev_report.get_custom_value(_get_video_cache_key(element)), ) # A video might contain other videos, so let's "crawl" the video first @@ -530,6 +638,8 @@ instance's greatest bottleneck. if not maybe_dl: return None + self._ensure_not_seen(element, element_path) + # If we have every file from the cached mapping already, we can ignore this and bail if self._all_opencast_videos_locally_present(element, maybe_dl.path): # Mark all existing videos as known to ensure they do not get deleted during cleanup. @@ -548,7 +658,7 @@ instance's greatest bottleneck. def _previous_contained_opencast_videos( self, element: IliasPageElement, element_path: PurePath - ) -> List[PurePath]: + ) -> list[PurePath]: if not self.prev_report: return [] custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) @@ -586,11 +696,11 @@ instance's greatest bottleneck. def add_to_report(paths: list[str]) -> None: self.report.add_custom_value( _get_video_cache_key(element), - {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))} + {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}, ) async with dl as (bar, sink): - page = IliasPage(await self._get_page(element.url), element.url, element) + page = IliasPage(await self._get_page(element.url), element) stream_elements = page.get_child_elements() if len(stream_elements) > 1: @@ -600,11 +710,11 @@ instance's greatest bottleneck. stream_element = stream_elements[0] # We do not have a local cache yet - await self._stream_from_url(stream_element.url, sink, bar, is_video=True) + await self._stream_from_url(stream_element, sink, bar, is_video=True) add_to_report([str(self._transformer.transform(dl.path))]) return - contained_video_paths: List[str] = [] + contained_video_paths: list[str] = [] for stream_element in stream_elements: video_path = dl.path.parent / stream_element.name @@ -615,7 +725,7 @@ instance's greatest bottleneck. async with maybe_dl as (bar, sink): log.explain(f"Streaming video from real url {stream_element.url}") contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) - await self._stream_from_url(stream_element.url, sink, bar, is_video=True) + await self._stream_from_url(stream_element, sink, bar, is_video=True) add_to_report(contained_video_paths) @@ -623,23 +733,29 @@ instance's greatest bottleneck. self, element: IliasPageElement, element_path: PurePath, + is_video: bool = False, ) -> Optional[Coroutine[Any, Any, None]]: maybe_dl = await self.download(element_path, mtime=element.mtime) if not maybe_dl: return None - return self._download_file(element, maybe_dl) + self._ensure_not_seen(element, element_path) + + return self._download_file(element, maybe_dl, is_video) @_iorepeat(3, "downloading file") @anoncritical - async def _download_file(self, element: IliasPageElement, dl: DownloadToken) -> None: + async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None: assert dl # The function is only reached when dl is not None async with dl as (bar, sink): - await self._stream_from_url(element.url, sink, bar, is_video=False) + await self._stream_from_url(element, sink, bar, is_video) + + async def _stream_from_url( + self, element: IliasPageElement, sink: FileSink, bar: ProgressBar, is_video: bool + ) -> None: + url = element.url - async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None: async def try_stream() -> bool: next_url = url - # Normal files redirect to the magazine if we are not authenticated. As files could be HTML, # we can not match on the content type here. Instead, we disallow redirects and inspect the # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume @@ -663,6 +779,13 @@ instance's greatest bottleneck. if is_video and "html" in resp.content_type: return False + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range + if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None): + parts = content_range.split("/") + if len(parts) == 2 and parts[1].isdigit(): + bar.set_total(int(parts[1])) + + # Prefer the content length header if resp.content_length: bar.set_total(resp.content_length) @@ -680,7 +803,7 @@ instance's greatest bottleneck. await self.authenticate(auth_id) if not await try_stream(): - raise CrawlError("File streaming failed after authenticate()") + raise CrawlError(f"File streaming failed after authenticate() {element!r}") async def _handle_forum( self, @@ -695,36 +818,23 @@ instance's greatest bottleneck. @_iorepeat(3, "crawling forum") @anoncritical async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None: - elements: List[IliasForumThread] = [] - async with cl: - next_stage_url = element.url - while next_stage_url: - log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") - log.explain(f"URL: {next_stage_url}") - - soup = await self._get_page(next_stage_url) - page = IliasPage(soup, next_stage_url, element) - - if next := page.get_next_stage_element(): - next_stage_url = next.url - else: - break - - download_data = page.get_download_forum_data() - if not download_data: - raise CrawlWarning("Failed to extract forum data") - if download_data.empty: - log.explain("Forum had no threads") + inner = IliasPage(await self._get_page(element.url), element) + export_url = inner.get_forum_export_url() + if not export_url: + log.warn("Could not extract forum export url") return - html = await self._post_authenticated(download_data.url, download_data.form_data) - elements = parse_ilias_forum_export(soupify(html)) - elements.sort(key=lambda elem: elem.title) + export = await self._post( + export_url, + {"format": "html", "cmd[createExportFile]": ""}, + ) - tasks: List[Awaitable[None]] = [] - for elem in elements: - tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem))) + elements = parse_ilias_forum_export(soupify(export)) + + tasks: list[Awaitable[None]] = [] + for thread in elements: + tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url))) # And execute them await self.gather(tasks) @@ -732,19 +842,18 @@ instance's greatest bottleneck. @anoncritical @_iorepeat(3, "saving forum thread") async def _download_forum_thread( - self, - parent_path: PurePath, - element: IliasForumThread, + self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str ) -> None: - path = parent_path / (_sanitize_path_name(element.title) + ".html") - maybe_dl = await self.download(path, mtime=element.mtime) - if not maybe_dl: + path = parent_path / (sanitize_path_name(thread.name) + ".html") + maybe_dl = await self.download(path, mtime=thread.mtime) + if not maybe_dl or not isinstance(thread, IliasForumThread): return async with maybe_dl as (bar, sink): - content = element.title_tag.prettify() - content += element.content_tag.prettify() - sink.file.write(content.encode("utf-8")) + rendered = forum_thread_template( + thread.name, forum_url, thread.name_tag, await self.internalize_images(thread.content_tag) + ) + sink.file.write(rendered.encode("utf-8")) sink.done() async def _handle_learning_module( @@ -755,38 +864,40 @@ instance's greatest bottleneck. maybe_cl = await self.crawl(element_path) if not maybe_cl: return None + self._ensure_not_seen(element, element_path) + return self._crawl_learning_module(element, maybe_cl) @_iorepeat(3, "crawling learning module") @anoncritical async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None: - elements: List[IliasLearningModulePage] = [] + elements: list[IliasLearningModulePage] = [] async with cl: log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain(f"URL: {element.url}") soup = await self._get_page(element.url) - page = IliasPage(soup, element.url, element) + page = IliasPage(soup, element) if next := page.get_learning_module_data(): - elements.extend(await self._crawl_learning_module_direction( - cl.path, next.previous_url, "left", element - )) + elements.extend( + await self._crawl_learning_module_direction(cl.path, next.previous_url, "left", element) + ) elements.append(next) - elements.extend(await self._crawl_learning_module_direction( - cl.path, next.next_url, "right", element - )) + elements.extend( + await self._crawl_learning_module_direction(cl.path, next.next_url, "right", element) + ) # Reflect their natural ordering in the file names for index, lm_element in enumerate(elements): lm_element.title = f"{index:02}_{lm_element.title}" - tasks: List[Awaitable[None]] = [] + tasks: list[Awaitable[None]] = [] for index, elem in enumerate(elements): prev_url = elements[index - 1].title if index > 0 else None next_url = elements[index + 1].title if index < len(elements) - 1 else None - tasks.append(asyncio.create_task( - self._download_learning_module_page(cl.path, elem, prev_url, next_url) - )) + tasks.append( + asyncio.create_task(self._download_learning_module_page(cl.path, elem, prev_url, next_url)) + ) # And execute them await self.gather(tasks) @@ -795,10 +906,10 @@ instance's greatest bottleneck. self, path: PurePath, start_url: Optional[str], - dir: Union[Literal["left"], Literal["right"]], - parent_element: IliasPageElement - ) -> List[IliasLearningModulePage]: - elements: List[IliasLearningModulePage] = [] + dir: Literal["left"] | Literal["right"], + parent_element: IliasPageElement, + ) -> list[IliasLearningModulePage]: + elements: list[IliasLearningModulePage] = [] if not start_url: return elements @@ -809,13 +920,10 @@ instance's greatest bottleneck. log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain(f"URL: {next_element_url}") soup = await self._get_page(next_element_url) - page = IliasPage(soup, next_element_url, parent_element) + page = IliasPage(soup, parent_element) if next := page.get_learning_module_data(): elements.append(next) - if dir == "left": - next_element_url = next.previous_url - else: - next_element_url = next.next_url + next_element_url = next.previous_url if dir == "left" else next.next_url counter += 1 return elements @@ -827,9 +935,9 @@ instance's greatest bottleneck. parent_path: PurePath, element: IliasLearningModulePage, prev: Optional[str], - next: Optional[str] + next: Optional[str], ) -> None: - path = parent_path / (_sanitize_path_name(element.title) + ".html") + path = parent_path / (sanitize_path_name(element.title) + ".html") maybe_dl = await self.download(path) if not maybe_dl: return @@ -838,17 +946,11 @@ instance's greatest bottleneck. return if prev: - prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) - if prev_p: - prev = os.path.relpath(prev_p, my_path.parent) - else: - prev = None + prev_p = self._transformer.transform(parent_path / (sanitize_path_name(prev) + ".html")) + prev = os.path.relpath(prev_p, my_path.parent) if prev_p else None if next: - next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) - if next_p: - next = os.path.relpath(next_p, my_path.parent) - else: - next = None + next_p = self._transformer.transform(parent_path / (sanitize_path_name(next) + ".html")) + next = os.path.relpath(next_p, my_path.parent) if next_p else None async with maybe_dl as (bar, sink): content = element.content @@ -862,25 +964,31 @@ instance's greatest bottleneck. """ log.explain_topic("Internalizing images") for elem in tag.find_all(recursive=True): - if not isinstance(elem, Tag): - continue - if elem.name == "img": - if src := elem.attrs.get("src", None): - url = urljoin(self._base_url, src) - if not url.startswith(self._base_url): - continue - log.explain(f"Internalizing {url!r}") - img = await self._get_authenticated(url) - elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() - if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"): + if elem.name == "img" and (src := elem.attrs.get("src", None)): + url = urljoin(self._base_url, cast(str, src)) + if not url.startswith(self._base_url): + continue + log.explain(f"Internalizing {url!r}") + img = await self._get_authenticated(url) + elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() + if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"): # For unknown reasons the protocol seems to be stripped. - elem.attrs["src"] = "https:" + elem.attrs["src"] + elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"]) return tag - async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup: + def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None: + if element.url in self._visited_urls: + raise CrawlWarning( + f"Found second path to element {element.name!r} at {element.url!r}. " + + f"First path: {fmt_path(self._visited_urls[element.url])}. " + + f"Second path: {fmt_path(parent_path)}." + ) + self._visited_urls[element.url] = parent_path + + async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup: auth_id = await self._current_auth_id() async with self.session.get(url) as request: - soup = soupify(await request.read()) + soup = IliasSoup(soupify(await request.read()), str(request.url)) if IliasPage.is_logged_in(soup): return self._verify_page(soup, url, root_page_allowed) @@ -889,13 +997,13 @@ instance's greatest bottleneck. # Retry once after authenticating. If this fails, we will die. async with self.session.get(url) as request: - soup = soupify(await request.read()) + soup = IliasSoup(soupify(await request.read()), str(request.url)) if IliasPage.is_logged_in(soup): return self._verify_page(soup, url, root_page_allowed) raise CrawlError(f"get_page failed even after authenticating on {url!r}") @staticmethod - def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup: + def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup: if IliasPage.is_root_page(soup) and not root_page_allowed: raise CrawlError( "Unexpectedly encountered ILIAS root page. " @@ -907,29 +1015,15 @@ instance's greatest bottleneck. ) return soup - async def _post_authenticated( - self, - url: str, - data: dict[str, Union[str, List[str]]] - ) -> bytes: - auth_id = await self._current_auth_id() - + async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes: form_data = aiohttp.FormData() for key, val in data.items(): form_data.add_field(key, val) - async with self.session.post(url, data=form_data(), allow_redirects=False) as request: + async with self.session.post(url, data=form_data()) as request: if request.status == 200: return await request.read() - - # We weren't authenticated, so try to do that - await self.authenticate(auth_id) - - # Retry once after authenticating. If this fails, we will die. - async with self.session.post(url, data=data, allow_redirects=False) as request: - if request.status == 200: - return await request.read() - raise CrawlError("post_authenticated failed even after authenticating") + raise CrawlError(f"post failed with status {request.status}") async def _get_authenticated(self, url: str) -> bytes: auth_id = await self._current_auth_id() @@ -947,63 +1041,34 @@ instance's greatest bottleneck. return await request.read() raise CrawlError("get_authenticated failed even after authenticating") - # ToDo: Is iorepeat still required? - @_iorepeat(3, "Login", failure_is_error=True) async def _authenticate(self) -> None: # fill the session with the correct cookies - params = { - "client_id": self._client_id, - "cmd": "force_login", - } - async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: - login_page = soupify(await request.read()) + if self._login_type == "shibboleth": + await self._shibboleth_login.login(self.session) + else: + params = { + "client_id": self._client_id, + "cmd": "force_login", + } + async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: + login_page = soupify(await request.read()) - login_form = login_page.find("form", attrs={"name": "formlogin"}) - if login_form is None: - raise CrawlError("Could not find the login form! Specified client id might be invalid.") + login_form = login_page.find("form", attrs={"name": "login_form"}) + if login_form is None: + raise CrawlError("Could not find the login form! Specified client id might be invalid.") - login_url = login_form.attrs.get("action") - if login_url is None: - raise CrawlError("Could not find the action URL in the login form!") + login_url = cast(Optional[str], login_form.attrs.get("action")) + if login_url is None: + raise CrawlError("Could not find the action URL in the login form!") - username, password = await self._auth.credentials() + username, password = await self._auth.credentials() - login_data = { - "username": username, - "password": password, - "cmd[doStandardAuthentication]": "Login", - } + login_form_data = aiohttp.FormData() + login_form_data.add_field("login_form/input_3/input_4", username) + login_form_data.add_field("login_form/input_3/input_5", password) - # do the actual login - async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request: - soup = soupify(await request.read()) - if not self._is_logged_in(soup): - self._auth.invalidate_credentials() - - @staticmethod - def _is_logged_in(soup: BeautifulSoup) -> bool: - # Normal ILIAS pages - mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar") - if mainbar is not None: - login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x}) - shib_login = soup.find(id="button_shib_login") - return not login_button and not shib_login - - # Personal Desktop - if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}): - return True - - # Video listing embeds do not have complete ILIAS html. Try to match them by - # their video listing table - video_table = soup.find( - recursive=True, - name="table", - attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")} - ) - if video_table is not None: - return True - # The individual video player wrapper page has nothing of the above. - # Match it by its playerContainer. - if soup.select_one("#playerContainer") is not None: - return True - return False + # do the actual login + async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request: + soup = IliasSoup(soupify(await request.read()), str(request.url)) + if not IliasPage.is_logged_in(soup): + self._auth.invalidate_credentials() diff --git a/PFERD/crawl/ilias/kit_ilias_html.py b/PFERD/crawl/ilias/kit_ilias_html.py index 34e02ba..5966141 100644 --- a/PFERD/crawl/ilias/kit_ilias_html.py +++ b/PFERD/crawl/ilias/kit_ilias_html.py @@ -1,39 +1,245 @@ import json import re +from collections.abc import Callable from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum -from typing import Dict, List, Optional, Union, cast +from typing import Optional, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag +from PFERD.crawl import CrawlError +from PFERD.crawl.crawler import CrawlWarning from PFERD.logging import log -from PFERD.utils import url_set_query_params +from PFERD.utils import sanitize_path_name, url_set_query_params -TargetType = Union[str, int] +TargetType = str | int + + +class TypeMatcher: + class UrlPath: + path: str + + def __init__(self, path: str): + self.path = path + + class UrlParameter: + query: str + + def __init__(self, query: str): + self.query = query + + class ImgSrc: + src: str + + def __init__(self, src: str): + self.src = src + + class ImgAlt: + alt: str + + def __init__(self, alt: str): + self.alt = alt + + class All: + matchers: list["IliasElementMatcher"] + + def __init__(self, matchers: list["IliasElementMatcher"]): + self.matchers = matchers + + class Any: + matchers: list["IliasElementMatcher"] + + def __init__(self, matchers: list["IliasElementMatcher"]): + self.matchers = matchers + + @staticmethod + def path(path: str) -> UrlPath: + return TypeMatcher.UrlPath(path) + + @staticmethod + def query(query: str) -> UrlParameter: + return TypeMatcher.UrlParameter(query) + + @staticmethod + def img_src(src: str) -> ImgSrc: + return TypeMatcher.ImgSrc(src) + + @staticmethod + def img_alt(alt: str) -> ImgAlt: + return TypeMatcher.ImgAlt(alt) + + @staticmethod + def all(*matchers: "IliasElementMatcher") -> All: + return TypeMatcher.All(list(matchers)) + + @staticmethod + def any(*matchers: "IliasElementMatcher") -> Any: + return TypeMatcher.Any(list(matchers)) + + @staticmethod + def never() -> Any: + return TypeMatcher.Any([]) + + +IliasElementMatcher = ( + TypeMatcher.UrlPath + | TypeMatcher.UrlParameter + | TypeMatcher.ImgSrc + | TypeMatcher.ImgAlt + | TypeMatcher.All + | TypeMatcher.Any +) class IliasElementType(Enum): - EXERCISE = "exercise" + BLOG = "blog" + BOOKING = "booking" + COURSE = "course" + DCL_RECORD_LIST = "dcl_record_list" + EXERCISE_OVERVIEW = "exercise_overview" + EXERCISE = "exercise" # own submitted files EXERCISE_FILES = "exercise_files" # own submitted files - TEST = "test" # an online test. Will be ignored currently. FILE = "file" FOLDER = "folder" FORUM = "forum" - LINK = "link" + FORUM_THREAD = "forum_thread" INFO_TAB = "info_tab" LEARNING_MODULE = "learning_module" - BOOKING = "booking" - MEETING = "meeting" - SURVEY = "survey" - SCORM_LEARNING_MODULE = "scorm_learning_module" - MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" + LEARNING_MODULE_HTML = "learning_module_html" + LITERATURE_LIST = "literature_list" + LINK = "link" + LINK_COLLECTION = "link_collection" + MEDIA_POOL = "media_pool" MEDIACAST_VIDEO = "mediacast_video" + MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" + MEETING = "meeting" + MOB_VIDEO = "mob_video" OPENCAST_VIDEO = "opencast_video" - OPENCAST_VIDEO_PLAYER = "opencast_video_player" OPENCAST_VIDEO_FOLDER = "opencast_video_folder" OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated" + OPENCAST_VIDEO_PLAYER = "opencast_video_player" + SCORM_LEARNING_MODULE = "scorm_learning_module" + SURVEY = "survey" + TEST = "test" # an online test. Will be ignored currently. + WIKI = "wiki" + + def matcher(self) -> IliasElementMatcher: + match self: + case IliasElementType.BLOG: + return TypeMatcher.any(TypeMatcher.img_src("_blog.svg")) + case IliasElementType.BOOKING: + return TypeMatcher.any(TypeMatcher.path("/book/"), TypeMatcher.img_src("_book.svg")) + case IliasElementType.COURSE: + return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg")) + case IliasElementType.DCL_RECORD_LIST: + return TypeMatcher.any( + TypeMatcher.img_src("_dcl.svg"), TypeMatcher.query("cmdclass=ildclrecordlistgui") + ) + case IliasElementType.EXERCISE: + return TypeMatcher.never() + case IliasElementType.EXERCISE_FILES: + return TypeMatcher.never() + case IliasElementType.EXERCISE_OVERVIEW: + return TypeMatcher.any( + TypeMatcher.path("/exc/"), + TypeMatcher.path("_exc_"), + TypeMatcher.img_src("_exc.svg"), + ) + case IliasElementType.FILE: + return TypeMatcher.any( + TypeMatcher.query("cmd=sendfile"), + TypeMatcher.path("_file_"), + TypeMatcher.img_src("/filedelivery/"), + ) + case IliasElementType.FOLDER: + return TypeMatcher.any( + TypeMatcher.path("/fold/"), + TypeMatcher.img_src("_fold.svg"), + TypeMatcher.path("/grp/"), + TypeMatcher.img_src("_grp.svg"), + TypeMatcher.path("/copa/"), + TypeMatcher.path("_copa_"), + TypeMatcher.img_src("_copa.svg"), + # Not supported right now but warn users + # TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), + # TypeMatcher.img_alt("medienpool"), + # TypeMatcher.img_src("_mep.svg"), + ) + case IliasElementType.FORUM: + return TypeMatcher.any( + TypeMatcher.path("/frm/"), + TypeMatcher.path("_frm_"), + TypeMatcher.img_src("_frm.svg"), + ) + case IliasElementType.FORUM_THREAD: + return TypeMatcher.never() + case IliasElementType.INFO_TAB: + return TypeMatcher.never() + case IliasElementType.LITERATURE_LIST: + return TypeMatcher.img_src("_bibl.svg") + case IliasElementType.LEARNING_MODULE: + return TypeMatcher.any(TypeMatcher.path("/lm/"), TypeMatcher.img_src("_lm.svg")) + case IliasElementType.LEARNING_MODULE_HTML: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilhtlmpresentationgui"), TypeMatcher.img_src("_htlm.svg") + ) + case IliasElementType.LINK: + return TypeMatcher.any( + TypeMatcher.all( + TypeMatcher.query("baseclass=illinkresourcehandlergui"), + TypeMatcher.query("calldirectlink"), + ), + TypeMatcher.img_src("_webr.svg"), # duplicated :( + ) + case IliasElementType.LINK_COLLECTION: + return TypeMatcher.any( + TypeMatcher.query("baseclass=illinkresourcehandlergui"), + TypeMatcher.img_src("_webr.svg"), # duplicated :( + ) + case IliasElementType.MEDIA_POOL: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), TypeMatcher.img_src("_mep.svg") + ) + case IliasElementType.MEDIACAST_VIDEO: + return TypeMatcher.never() + case IliasElementType.MEDIACAST_VIDEO_FOLDER: + return TypeMatcher.any( + TypeMatcher.path("/mcst/"), + TypeMatcher.query("baseclass=ilmediacasthandlergui"), + TypeMatcher.img_src("_mcst.svg"), + ) + case IliasElementType.MEETING: + return TypeMatcher.any(TypeMatcher.img_src("_sess.svg")) + case IliasElementType.MOB_VIDEO: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO_FOLDER: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: + return TypeMatcher.img_alt("opencast") + case IliasElementType.OPENCAST_VIDEO_PLAYER: + return TypeMatcher.never() + case IliasElementType.SCORM_LEARNING_MODULE: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilsahspresentationgui"), TypeMatcher.img_src("_sahs.svg") + ) + case IliasElementType.SURVEY: + return TypeMatcher.any(TypeMatcher.path("/svy/"), TypeMatcher.img_src("svy.svg")) + case IliasElementType.TEST: + return TypeMatcher.any( + TypeMatcher.query("cmdclass=ilobjtestgui"), + TypeMatcher.query("cmdclass=iltestscreengui"), + TypeMatcher.img_src("_tst.svg"), + ) + case IliasElementType.WIKI: + return TypeMatcher.any( + TypeMatcher.query("baseClass=ilwikihandlergui"), TypeMatcher.img_src("wiki.svg") + ) + + raise CrawlWarning(f"Unknown matcher {self}") @dataclass @@ -47,14 +253,25 @@ class IliasPageElement: def id(self) -> str: regexes = [ r"eid=(?P[0-9a-z\-]+)", - r"file_(?P\d+)", - r"copa_(?P\d+)", - r"fold_(?P\d+)", - r"frm_(?P\d+)", - r"exc_(?P\d+)", + r"book/(?P\d+)", # booking + r"cat/(?P\d+)", + r"copa/(?P\d+)", # content page + r"crs/(?P\d+)", # course + r"exc/(?P\d+)", # exercise + r"file/(?P\d+)", # file + r"fold/(?P\d+)", # folder + r"frm/(?P\d+)", # forum + r"grp/(?P\d+)", # group + r"lm/(?P\d+)", # learning module + r"mcst/(?P\d+)", # mediacast + r"pg/(?P(\d|_)+)", # page? + r"svy/(?P\d+)", # survey + r"sess/(?P\d+)", # session + r"webr/(?P\d+)", # web referene (link) + r"thr_pk=(?P\d+)", # forums r"ref_id=(?P\d+)", r"target=[a-z]+_(?P\d+)", - r"mm_(?P\d+)" + r"mm_(?P\d+)", ] for regex in regexes: @@ -72,15 +289,15 @@ class IliasPageElement: name: str, mtime: Optional[datetime] = None, description: Optional[str] = None, - skip_sanitize: bool = False - ) -> 'IliasPageElement': + skip_sanitize: bool = False, + ) -> "IliasPageElement": if typ == IliasElementType.MEETING: normalized = IliasPageElement._normalize_meeting_name(name) log.explain(f"Normalized meeting name from {name!r} to {normalized!r}") name = normalized if not skip_sanitize: - name = _sanitize_path_name(name) + name = sanitize_path_name(name) return IliasPageElement(typ, url, name, mtime, description) @@ -92,7 +309,7 @@ class IliasPageElement: """ # This checks whether we can reach a `:` without passing a `-` - if re.search(r"^[^-]+: ", meeting_name): + if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108 # Meeting name only contains date: "05. Jan 2000:" split_delimiter = ":" else: @@ -115,14 +332,14 @@ class IliasPageElement: @dataclass class IliasDownloadForumData: url: str - form_data: Dict[str, Union[str, List[str]]] + form_data: dict[str, str | list[str]] empty: bool @dataclass class IliasForumThread: - title: str - title_tag: Tag + name: str + name_tag: Tag content_tag: Tag mtime: Optional[datetime] @@ -135,21 +352,30 @@ class IliasLearningModulePage: previous_url: Optional[str] -class IliasPage: +class IliasSoup: + soup: BeautifulSoup + page_url: str - def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): - self._soup = soup - self._page_url = _page_url + def __init__(self, soup: BeautifulSoup, page_url: str): + self.soup = soup + self.page_url = page_url + + +class IliasPage: + def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]): + self._ilias_soup = ilias_soup + self._soup = ilias_soup.soup + self._page_url = ilias_soup.page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" @staticmethod - def is_root_page(soup: BeautifulSoup) -> bool: + def is_root_page(soup: IliasSoup) -> bool: if permalink := IliasPage.get_soup_permalink(soup): - return "goto.php?target=root_" in permalink + return "goto.php/root/" in permalink return False - def get_child_elements(self) -> List[IliasPageElement]: + def get_child_elements(self) -> list[IliasPageElement]: """ Return all child page elements you can find here. """ @@ -176,22 +402,25 @@ class IliasPage: def get_info_tab(self) -> Optional[IliasPageElement]: tab: Optional[Tag] = self._soup.find( - name="a", - attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x} + name="a", attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x} ) if tab is not None: return IliasPageElement.create_new( - IliasElementType.INFO_TAB, - self._abs_url_from_link(tab), - "infos" + IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos" ) return None def get_description(self) -> Optional[BeautifulSoup]: - def is_interesting_class(name: str) -> bool: - return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] + def is_interesting_class(name: str | None) -> bool: + return name in [ + "ilCOPageSection", + "ilc_Paragraph", + "ilc_va_ihcap_VAccordIHeadCap", + "ilc_va_ihcap_AccordIHeadCap", + "ilc_media_cont_MediaContainer", + ] - paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class) + paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class)) if not paragraphs: return None @@ -202,6 +431,20 @@ class IliasPage: for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue + if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")): + # We have an embedded video which should be downloaded by _find_mob_videos + url, title = self._find_mob_video_url_title(video, p) + raw_html += '
External Video: {title}' + else: + raw_html += f"Video elided. Filename: '{title}'." + raw_html += "
\n" + continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: @@ -215,13 +458,13 @@ class IliasPage: def get_learning_module_data(self) -> Optional[IliasLearningModulePage]: if not self._is_learning_module_page(): return None - content = self._soup.select_one("#ilLMPageContent") - title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip() + content = cast(Tag, self._soup.select_one("#ilLMPageContent")) + title = cast(Tag, self._soup.select_one(".ilc_page_title_PageTitle")).get_text().strip() return IliasLearningModulePage( title=title, content=content, next_url=self._find_learning_module_next(), - previous_url=self._find_learning_module_prev() + previous_url=self._find_learning_module_prev(), ) def _find_learning_module_next(self) -> Optional[str]: @@ -240,29 +483,28 @@ class IliasPage: return url return None - def get_download_forum_data(self) -> Optional[IliasDownloadForumData]: - form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x}) - if not form: + def get_forum_export_url(self) -> Optional[str]: + forum_link = self._soup.select_one("#tab_forums_threads > a") + if not forum_link: + log.explain("Found no forum link") return None - post_url = self._abs_url_from_relative(form["action"]) - thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})] + base_url = self._abs_url_from_link(forum_link) + base_url = re.sub(r"cmd=\w+", "cmd=post", base_url) + base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url) - form_data: Dict[str, Union[str, List[str]]] = { - "thread_ids[]": thread_ids, - "selected_cmd2": "html", - "select_cmd2": "Ausführen", - "selected_cmd": "", - } + rtoken_form = self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x}) + if not rtoken_form: + log.explain("Found no rtoken anywhere") + return None + match = cast(re.Match[str], re.search(r"rtoken=(\w+)", str(rtoken_form.attrs["action"]))) + rtoken = match.group(1) - return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0) + base_url = base_url + "&rtoken=" + rtoken + + return base_url def get_next_stage_element(self) -> Optional[IliasPageElement]: - if self._is_forum_page(): - if "trows=800" in self._page_url: - return None - log.explain("Requesting *all* forum threads") - return self._get_show_max_forum_entries_per_page_url() if self._is_ilias_opencast_embedding(): log.explain("Unwrapping opencast embedding") return self.get_child_elements()[0] @@ -272,6 +514,8 @@ class IliasPage: if self._contains_collapsed_future_meetings(): log.explain("Requesting *all* future meetings") return self._uncollapse_future_meetings_url() + if self._is_exercise_not_all_shown(): + return self._show_all_exercises() if not self._is_content_tab_selected(): if self._page_type != IliasElementType.INFO_TAB: log.explain("Selecting content tab") @@ -280,13 +524,6 @@ class IliasPage: log.explain("Crawling info tab, skipping content select") return None - def _is_forum_page(self) -> bool: - read_more_btn = self._soup.find( - "button", - attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x} - ) - return read_more_btn is not None - def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) @@ -295,38 +532,36 @@ class IliasPage: return True # Raw listing without ILIAS fluff - video_element_table: Tag = self._soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) + video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) return video_element_table is not None def _is_ilias_opencast_embedding(self) -> bool: # ILIAS fluff around the real opencast html if self._soup.find(id="headerimage"): - element: Tag = self._soup.find(id="headerimage") - if "opencast" in element.attrs["src"].lower(): + element: Tag = cast(Tag, self._soup.find(id="headerimage")) + if "opencast" in cast(str, element.attrs["src"]).lower(): return True return False def _is_exercise_file(self) -> bool: # we know it from before - if self._page_type == IliasElementType.EXERCISE: + if self._page_type == IliasElementType.EXERCISE_OVERVIEW: return True # We have no suitable parent - let's guesss if self._soup.find(id="headerimage"): - element: Tag = self._soup.find(id="headerimage") - if "exc" in element.attrs["src"].lower(): + element: Tag = cast(Tag, self._soup.find(id="headerimage")) + if "exc" in cast(str, element.attrs["src"]).lower(): return True return False def _is_personal_desktop(self) -> bool: - return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) + return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower() def _is_content_page(self) -> bool: if link := self.get_permalink(): - return "target=copa_" in link + return "/copa/" in link return False def _is_learning_module_page(self) -> bool: @@ -340,13 +575,23 @@ class IliasPage: def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: element = self._soup.find( "a", - attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)} + attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}, ) if not element: return None link = self._abs_url_from_link(element) return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings") + def _is_exercise_not_all_shown(self) -> bool: + return ( + self._page_type == IliasElementType.EXERCISE_OVERVIEW and "mode=all" not in self._page_url.lower() + ) + + def _show_all_exercises(self) -> Optional[IliasPageElement]: + return IliasPageElement.create_new( + IliasElementType.EXERCISE_OVERVIEW, self._page_url + "&mode=all", "show all exercises" + ) + def _is_content_tab_selected(self) -> bool: return self._select_content_page_url() is None @@ -359,31 +604,28 @@ class IliasPage: def _select_content_page_url(self) -> Optional[IliasPageElement]: tab = self._soup.find( - id="tab_view_content", - attrs={"class": lambda x: x is not None and "active" not in x} + id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x} ) # Already selected (or not found) if not tab: return None link = tab.find("a") if link: - link = self._abs_url_from_link(link) - return IliasPageElement.create_new(IliasElementType.FOLDER, link, "select content page") + link_str = self._abs_url_from_link(link) + return IliasPageElement.create_new(IliasElementType.FOLDER, link_str, "select content page") _unexpected_html_warning() log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.") log.warn_contd("PFERD might not find content on the course's main page.") return None - def _player_to_video(self) -> List[IliasPageElement]: + def _player_to_video(self) -> list[IliasPageElement]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source - regex = re.compile( - r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE - ) + regex = re.compile(r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE) json_match = regex.search(str(self._soup)) if json_match is None: @@ -411,56 +653,77 @@ class IliasPage: return items - def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]: + def _get_show_max_forum_entries_per_page_url( + self, wanted_max: Optional[int] = None + ) -> Optional[IliasPageElement]: correct_link = self._soup.find( - "a", - attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x} + "a", attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x} ) if not correct_link: return None link = self._abs_url_from_link(correct_link) + if wanted_max is not None: + link = link.replace("trows=800", f"trows={wanted_max}") return IliasPageElement.create_new(IliasElementType.FORUM, link, "show all forum threads") - def _find_personal_desktop_entries(self) -> List[IliasPageElement]: - items: List[IliasPageElement] = [] + def _get_forum_thread_count(self) -> Optional[int]: + log.explain_topic("Trying to find forum thread count") - titles: List[Tag] = self._soup.select(".il-item-title") + candidates = cast(list[Tag], self._soup.select(".ilTableFootLight")) + extract_regex = re.compile(r"\s(?P\d+)\s*\)") + + for candidate in candidates: + log.explain(f"Found thread count candidate: {candidate}") + if match := extract_regex.search(candidate.get_text()): + return int(match.group("max")) + else: + log.explain("Found no candidates to extract thread count from") + + return None + + def _find_personal_desktop_entries(self) -> list[IliasPageElement]: + items: list[IliasPageElement] = [] + + titles: list[Tag] = self._soup.select("#block_pditems_0 .il-item-title") for title in titles: link = title.find("a") - name = _sanitize_path_name(link.text.strip()) + + if not link: + log.explain(f"Skipping offline item: {title.get_text().strip()!r}") + continue + + name = sanitize_path_name(link.text.strip()) url = self._abs_url_from_link(link) if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url: # Configure button/link does not have anything interesting continue - type = self._find_type_from_link(name, link, url) - if not type: + typ = IliasPage._find_type_for_element( + name, url, lambda: IliasPage._find_icon_for_folder_entry(cast(Tag, link)) + ) + if not typ: _unexpected_html_warning() log.warn_contd(f"Could not extract type for {link}") continue - log.explain(f"Found {name!r}") + log.explain(f"Found {name!r} of type {typ}") - if type == IliasElementType.FILE and "_download" not in url: - url = re.sub(r"(target=file_\d+)", r"\1_download", url) - log.explain("Rewired file URL to include download part") - - items.append(IliasPageElement.create_new(type, url, name)) + items.append(IliasPageElement.create_new(typ, url, name)) return items - def _find_copa_entries(self) -> List[IliasPageElement]: - items: List[IliasPageElement] = [] - links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink") + def _find_copa_entries(self) -> list[IliasPageElement]: + items: list[IliasPageElement] = [] + links: list[Tag] = cast(list[Tag], self._soup.find_all(class_="ilc_flist_a_FileListItemLink")) for link in links: url = self._abs_url_from_link(link) - name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "") - name = _sanitize_path_name(name) + name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.get_text()).strip().replace("\t", "") + name = sanitize_path_name(name) if "file_id" not in url: _unexpected_html_warning() @@ -471,24 +734,26 @@ class IliasPage: return items - def _find_info_tab_entries(self) -> List[IliasPageElement]: + def _find_info_tab_entries(self) -> list[IliasPageElement]: items = [] - links: List[Tag] = self._soup.select("a.il_ContainerItemCommand") + links: list[Tag] = self._soup.select("a.il_ContainerItemCommand") for link in links: - if "cmdClass=ilobjcoursegui" not in link["href"]: + log.explain(f"Found info tab link: {self._abs_url_from_link(link)}") + if "cmdclass=ilobjcoursegui" not in cast(str, link["href"]).lower(): continue - if "cmd=sendfile" not in link["href"]: + if "cmd=sendfile" not in cast(str, link["href"]).lower(): continue - items.append(IliasPageElement.create_new( - IliasElementType.FILE, - self._abs_url_from_link(link), - _sanitize_path_name(link.getText()) - )) + items.append( + IliasPageElement.create_new( + IliasElementType.FILE, self._abs_url_from_link(link), sanitize_path_name(link.get_text()) + ) + ) + log.explain(f"Found {len(items)} info tab entries {items}") return items - def _find_opencast_video_entries(self) -> List[IliasPageElement]: + def _find_opencast_video_entries(self) -> list[IliasPageElement]: # ILIAS has three stages for video pages # 1. The initial dummy page without any videos. This page contains the link to the listing # 2. The video listing which might be paginated @@ -496,14 +761,12 @@ class IliasPage: # # We need to figure out where we are. - video_element_table: Tag = self._soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) + video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if video_element_table is None: # We are in stage 1 # The page is actually emtpy but contains the link to stage 2 - content_link: Tag = self._soup.select_one("#tab_series a") + content_link: Tag = cast(Tag, self._soup.select_one("#tab_series a")) url: str = self._abs_url_from_link(content_link) query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(url, query_params) @@ -514,43 +777,42 @@ class IliasPage: is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None - if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: + if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_opencast_video_entries_paginated() return self._find_opencast_video_entries_no_paging() - def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]: - table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + def _find_opencast_video_entries_paginated(self) -> list[IliasPageElement]: + table_element = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if table_element is None: log.warn("Couldn't increase elements per page (table not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() - id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) + id_match = re.match(r"tbl_xoct_(.+)", cast(str, table_element.attrs["id"])) if id_match is None: log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() table_id = id_match.group(1) - query_params = {f"tbl_xoct_{table_id}_trows": "800", - "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(self._page_url, query_params) log.explain("Disabled pagination, retrying folder as a new entry") return [IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] - def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: + def _find_opencast_video_entries_no_paging(self) -> list[IliasPageElement]: """ Crawls the "second stage" video page. This page contains the actual video urls. """ # Video start links are marked with an "Abspielen" link - video_links: List[Tag] = self._soup.findAll( - name="a", text=re.compile(r"\s*(Abspielen|Play)\s*") + video_links = cast( + list[Tag], self._soup.find_all(name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")) ) - results: List[IliasPageElement] = [] + results: list[IliasPageElement] = [] for link in video_links: results.append(self._listed_opencast_video_to_element(link)) @@ -562,12 +824,10 @@ class IliasPage: # 6th or 7th child (1 indexed) is the modification time string. Try to find it # by parsing backwards from the end and finding something that looks like a date modification_time = None - row: Tag = link.parent.parent.parent + row: Tag = link.parent.parent.parent # type: ignore column_count = len(row.select("td.std")) for index in range(column_count, 0, -1): - modification_string = link.parent.parent.parent.select_one( - f"td.std:nth-child({index})" - ).getText().strip() + modification_string = cast(Tag, row.select_one(f"td.std:nth-child({index})")).get_text().strip() if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string): modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M") break @@ -576,10 +836,10 @@ class IliasPage: log.warn(f"Could not determine upload time for {link}") modification_time = datetime.now() - title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() + title = cast(Tag, row.select_one("td.std:nth-child(3)")).get_text().strip() title += ".mp4" - video_name: str = _sanitize_path_name(title) + video_name: str = sanitize_path_name(title) video_url = self._abs_url_from_link(link) @@ -588,114 +848,133 @@ class IliasPage: IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time ) - def _find_exercise_entries(self) -> List[IliasPageElement]: + def _find_exercise_entries(self) -> list[IliasPageElement]: if self._soup.find(id="tab_submission"): - log.explain("Found submission tab. This is an exercise detail page") - return self._find_exercise_entries_detail_page() + log.explain("Found submission tab. This is an exercise detail or files page") + if self._soup.select_one("#tab_submission.active") is None: + log.explain(" This is a details page") + return self._find_exercise_entries_detail_page() + else: + log.explain(" This is a files page") + return self._find_exercise_entries_files_page() + log.explain("Found no submission tab. This is an exercise root page") return self._find_exercise_entries_root_page() - def _find_exercise_entries_detail_page(self) -> List[IliasPageElement]: - results: List[IliasPageElement] = [] + def _find_exercise_entries_detail_page(self) -> list[IliasPageElement]: + results: list[IliasPageElement] = [] - # Find all download links in the container (this will contain all the files) - download_links: List[Tag] = self._soup.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmd=download" in x}, - text="Download" + if link := self._soup.select_one("#tab_submission > a"): + results.append( + IliasPageElement.create_new( + IliasElementType.EXERCISE_FILES, self._abs_url_from_link(link), "Submission" + ) + ) + else: + log.explain("Found no submission link for exercise, maybe it has not started yet?") + + # Find all download links in the container (this will contain all the *feedback* files) + download_links = cast( + list[Tag], + self._soup.find_all( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x is not None and "cmd=download" in x}, + text="Download", + ), ) for link in download_links: - parent_row: Tag = link.findParent("tr") - children: List[Tag] = parent_row.findChildren("td") + parent_row: Tag = cast( + Tag, link.find_parent(attrs={"class": lambda x: x is not None and "row" in x}) + ) + name_tag = parent_row.find(name="div") - name = _sanitize_path_name(children[1].getText().strip()) + if not name_tag: + log.warn("Could not find name tag for exercise entry") + _unexpected_html_warning() + continue + + name = sanitize_path_name(name_tag.get_text().strip()) log.explain(f"Found exercise detail entry {name!r}") + results.append( + IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name) + ) + + return results + + def _find_exercise_entries_files_page(self) -> list[IliasPageElement]: + results: list[IliasPageElement] = [] + + # Find all download links in the container + download_links = cast( + list[Tag], + self._soup.find_all( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x is not None and "cmd=download" in x}, + text="Download", + ), + ) + + for link in download_links: + parent_row: Tag = cast(Tag, link.find_parent("tr")) + children = cast(list[Tag], parent_row.find_all("td")) + + name = sanitize_path_name(children[1].get_text().strip()) + log.explain(f"Found exercise file entry {name!r}") + + date = None for child in reversed(children): - date = demangle_date(child.getText().strip(), fail_silently=True) + date = demangle_date(child.get_text().strip(), fail_silently=True) if date is not None: break if date is None: - log.warn(f"Date parsing failed for exercise entry {name!r}") + log.warn(f"Date parsing failed for exercise file entry {name!r}") - results.append(IliasPageElement.create_new( - IliasElementType.FILE, - self._abs_url_from_link(link), - name, - date - )) + results.append( + IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name, date) + ) return results - def _find_exercise_entries_root_page(self) -> List[IliasPageElement]: - results: List[IliasPageElement] = [] + def _find_exercise_entries_root_page(self) -> list[IliasPageElement]: + results: list[IliasPageElement] = [] - # Each assignment is in an accordion container - assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") + content_tab = self._soup.find(id="ilContentContainer") + if not content_tab: + log.warn("Could not find content tab in exercise overview page") + _unexpected_html_warning() + return [] - for container in assignment_containers: - # Fetch the container name out of the header to use it in the path - container_name = container.select_one(".ilAssignmentHeader").getText().strip() - log.explain(f"Found exercise container {container_name!r}") + exercise_links = content_tab.select(".il-item-title a") - # Find all download links in the container (this will contain all the files) - files: List[Tag] = container.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, - text="Download" + for exercise in cast(list[Tag], exercise_links): + if "href" not in exercise.attrs: + continue + href = exercise.attrs["href"] + if type(href) is not str: + continue + if "ass_id=" not in href or "cmdclass=ilassignmentpresentationgui" not in href.lower(): + continue + + name = sanitize_path_name(exercise.get_text().strip()) + results.append( + IliasPageElement.create_new( + IliasElementType.EXERCISE, self._abs_url_from_link(exercise), name + ) ) - # Grab each file as you now have the link - for file_link in files: - # Two divs, side by side. Left is the name, right is the link ==> get left - # sibling - file_name = file_link.parent.findPrevious(name="div").getText().strip() - url = self._abs_url_from_link(file_link) - - log.explain(f"Found exercise entry {file_name!r}") - results.append(IliasPageElement.create_new( - IliasElementType.FILE, - url, - _sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name), - mtime=None, # We do not have any timestamp - skip_sanitize=True - )) - - # Find all links to file listings (e.g. "Submitted Files" for groups) - file_listings: List[Tag] = container.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()} - ) - - # Add each listing as a new - for listing in file_listings: - parent_container: Tag = listing.findParent( - "div", attrs={"class": lambda x: x and "form-group" in x} - ) - label_container: Tag = parent_container.find( - attrs={"class": lambda x: x and "control-label" in x} - ) - file_name = label_container.getText().strip() - url = self._abs_url_from_link(listing) - log.explain(f"Found exercise detail {file_name!r} at {url}") - results.append(IliasPageElement.create_new( - IliasElementType.EXERCISE_FILES, - url, - _sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name), - None, # we do not have any timestamp - skip_sanitize=True - )) + for result in results: + log.explain(f"Found exercise {result.name!r}") return results - def _find_normal_entries(self) -> List[IliasPageElement]: - result: List[IliasPageElement] = [] + def _find_normal_entries(self) -> list[IliasPageElement]: + result: list[IliasPageElement] = [] - links: List[Tag] = [] + links: list[Tag] = [] # Fetch all links and throw them to the general interpreter if self._is_course_overview_page(): log.explain("Page is a course overview page, adjusting link selector") @@ -706,15 +985,17 @@ class IliasPage: for link in links: abs_url = self._abs_url_from_link(link) # Make sure parents are sanitized. We do not want accidental parents - parents = [_sanitize_path_name(x) for x in self._find_upwards_folder_hierarchy(link)] + parents = [sanitize_path_name(x) for x in IliasPage._find_upwards_folder_hierarchy(link)] if parents: - element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) + element_name = "/".join(parents) + "/" + sanitize_path_name(link.get_text()) else: - element_name = _sanitize_path_name(link.getText()) + element_name = sanitize_path_name(link.get_text()) - element_type = self._find_type_from_link(element_name, link, abs_url) - description = self._find_link_description(link) + element_type = IliasPage._find_type_for_element( + element_name, abs_url, lambda: IliasPage._find_icon_for_folder_entry(link) + ) + description = IliasPage._find_link_description(link) # The last meeting on every page is expanded by default. # Its content is then shown inline *and* in the meeting page itself. @@ -725,60 +1006,112 @@ class IliasPage: if not element_type: continue elif element_type == IliasElementType.FILE: - result.append(self._file_to_element(element_name, abs_url, link)) + result.append(IliasPage._file_to_element(element_name, abs_url, link)) continue - log.explain(f"Found {element_name!r}") - result.append(IliasPageElement.create_new( - element_type, - abs_url, - element_name, - description=description, - skip_sanitize=True - )) + log.explain(f"Found {element_name!r} of type {element_type}") + result.append( + IliasPageElement.create_new( + element_type, abs_url, element_name, description=description, skip_sanitize=True + ) + ) result += self._find_cards() result += self._find_mediacast_videos() + result += self._find_mob_videos() return result - def _find_mediacast_videos(self) -> List[IliasPageElement]: - videos: List[IliasPageElement] = [] + def _find_mediacast_videos(self) -> list[IliasPageElement]: + videos: list[IliasPageElement] = [] - for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")): - element_name = _sanitize_path_name( - elem.select_one(".ilPlayerPreviewDescription").getText().strip() - ) - if not element_name.endswith(".mp4"): - # just to make sure it has some kinda-alrightish ending - element_name = element_name + ".mp4" - video_element = elem.find(name="video") - if not video_element: - _unexpected_html_warning() - log.warn_contd(f"No