diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs deleted file mode 100644 index 27246bf..0000000 --- a/.git-blame-ignore-revs +++ /dev/null @@ -1 +0,0 @@ -2cf0e060ed126537dd993896b6aa793e2a6b9e80 diff --git a/.github/workflows/build-and-release.yml b/.github/workflows/build-and-release.yml index 9cd962f..1f60c59 100644 --- a/.github/workflows/build-and-release.yml +++ b/.github/workflows/build-and-release.yml @@ -14,17 +14,23 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-13, macos-latest] - python: ["3.11"] + python: ["3.9"] steps: - uses: actions/checkout@v4 - - name: Install uv - uses: astral-sh/setup-uv@v7 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python }} - name: Set up project - run: uv sync + if: matrix.os != 'windows-latest' + run: ./scripts/setup + + - name: Set up project on windows + if: matrix.os == 'windows-latest' + # For some reason, `pip install --upgrade pip` doesn't work on + # 'windows-latest'. The installed pip version works fine however. + run: ./scripts/setup --no-pip - name: Run checks run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a2848c..e18f88a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,61 +22,6 @@ ambiguous situations. ## Unreleased -## Added -- Store the description when using the `internet-shortcut` link format -- Support for basic auth with the kit-ipd crawler - -## Fixed -- Event loop errors on Windows with Python 3.14 -- Sanitize `/` in headings in kit-ipd crawler -- Crawl info tab again - -## 3.8.3 - 2025-07-01 - -## Added -- Support for link collections. - In "fancy" mode, a single HTML file with multiple links is generated. - In all other modes, PFERD creates a folder for the collection and a new file - for every link inside. - -## Fixed -- Crawling of exercises with instructions -- Don't download unavailable elements. - Elements that are unavailable (for example, because their availability is - time restricted) will not download the HTML for the info page anymore. -- `base_url` argument for `ilias-web` crawler causing crashes - -## 3.8.2 - 2025-04-29 - -## Changed -- Explicitly mention that wikis are not supported at the moment and ignore them - -## Fixed -- Ilias-native login -- Exercise crawling - -## 3.8.1 - 2025-04-17 - -## Fixed -- Description html files now specify at UTF-8 encoding -- Images in descriptions now always have a white background - -## 3.8.0 - 2025-04-16 - -### Added -- Support for ILIAS 9 - -### Changed -- Added prettier CSS to forum threads -- Downloaded forum threads now link to the forum instead of the ILIAS thread -- Increase minimum supported Python version to 3.11 -- Do not crawl nested courses (courses linked in other courses) - -## Fixed -- File links in report on Windows -- TOTP authentication in KIT Shibboleth -- Forum crawling only considering the first 20 entries - ## 3.7.0 - 2024-11-13 ### Added diff --git a/CONFIG.md b/CONFIG.md index b87f75c..9b79be8 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -153,7 +153,6 @@ requests is likely a good idea. - `link_regex`: A regex that is matched against the `href` part of links. If it matches, the given link is downloaded as a file. This is used to extract files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`) -- `auth`: Name of auth section to use for basic authentication. (Optional) ### The `ilias-web` crawler @@ -164,15 +163,13 @@ out of the box for the corresponding universities: [ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs" -| University | `base_url` | `login_type` | `client_id` | -|-----------------|-----------------------------------------|--------------|---------------| -| FH Aachen | https://www.ili.fh-aachen.de | local | elearning | -| HHU Düsseldorf | https://ilias.hhu.de | local | UniRZ | -| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk | -| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ | -| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart | -| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | | -| KIT ILIAS Pilot | https://pilot.ilias.studium.kit.edu | shibboleth | pilot | +| University | `base_url` | `login_type` | `client_id` | +|---------------|-----------------------------------------|--------------|---------------| +| FH Aachen | https://www.ili.fh-aachen.de | local | elearning | +| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk | +| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ | +| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart | +| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | | If your university isn't listed, try navigating to your instance's login page. Assuming no custom login service is used, the URL will look something like this: diff --git a/DEV.md b/DEV.md index 8cc42c2..f577b93 100644 --- a/DEV.md +++ b/DEV.md @@ -9,25 +9,30 @@ particular [this][ppug-1] and [this][ppug-2] guide). ## Setting up a dev environment -The use of [venv][venv] and [uv][uv] is recommended. To initially set up a -development environment, run these commands in the same directory as this file: +The use of [venv][venv] is recommended. To initially set up a development +environment, run these commands in the same directory as this file: ``` -$ uv sync +$ python -m venv .venv $ . .venv/bin/activate +$ ./scripts/setup ``` -This install all required dependencies and tools. It also installs PFERD as -*editable*, which means that you can just run `pferd` as if it was installed -normally. Since PFERD was installed with `--editable`, there is no need to -re-run `uv sync` when the source code is changed. +The setup script installs a few required dependencies and tools. It also +installs PFERD via `pip install --editable .`, which means that you can just run +`pferd` as if it was installed normally. Since PFERD was installed with +`--editable`, there is no need to re-run `pip install` when the source code is +changed. + +If you get any errors because pip can't update itself, try running +`./scripts/setup --no-pip` instead of `./scripts/setup`. For more details, see [this part of the Python Tutorial][venv-tut] and [this section on "development mode"][ppug-dev]. [venv]: "venv - Creation of virtual environments" [venv-tut]: "12. Virtual Environments and Packages" -[uv]: "uv - An extremely fast Python package and project manager" +[ppug-dev]: "Working in “development mode”" ## Checking and formatting the code diff --git a/PFERD/__main__.py b/PFERD/__main__.py index 2de9dbc..cb8c67c 100644 --- a/PFERD/__main__.py +++ b/PFERD/__main__.py @@ -133,8 +133,7 @@ def main() -> None: # https://bugs.python.org/issue39232 # https://github.com/encode/httpx/issues/914#issuecomment-780023632 # TODO Fix this properly - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = asyncio.get_event_loop() loop.run_until_complete(pferd.run(args.debug_transforms)) loop.run_until_complete(asyncio.sleep(1)) loop.close() diff --git a/PFERD/auth/__init__.py b/PFERD/auth/__init__.py index 7295c7a..aa3ba8e 100644 --- a/PFERD/auth/__init__.py +++ b/PFERD/auth/__init__.py @@ -1,5 +1,5 @@ -from collections.abc import Callable from configparser import SectionProxy +from typing import Callable, Dict from ..config import Config from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401 @@ -9,19 +9,21 @@ from .pass_ import PassAuthenticator, PassAuthSection from .simple import SimpleAuthenticator, SimpleAuthSection from .tfa import TfaAuthenticator -AuthConstructor = Callable[ - [ - str, # Name (without the "auth:" prefix) - SectionProxy, # Authenticator's section of global config - Config, # Global config - ], - Authenticator, -] +AuthConstructor = Callable[[ + str, # Name (without the "auth:" prefix) + SectionProxy, # Authenticator's section of global config + Config, # Global config +], Authenticator] -AUTHENTICATORS: dict[str, AuthConstructor] = { - "credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), - "keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)), - "pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)), - "simple": lambda n, s, c: SimpleAuthenticator(n, SimpleAuthSection(s)), - "tfa": lambda n, s, c: TfaAuthenticator(n), +AUTHENTICATORS: Dict[str, AuthConstructor] = { + "credential-file": lambda n, s, c: + CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), + "keyring": lambda n, s, c: + KeyringAuthenticator(n, KeyringAuthSection(s)), + "pass": lambda n, s, c: + PassAuthenticator(n, PassAuthSection(s)), + "simple": lambda n, s, c: + SimpleAuthenticator(n, SimpleAuthSection(s)), + "tfa": lambda n, s, c: + TfaAuthenticator(n), } diff --git a/PFERD/auth/authenticator.py b/PFERD/auth/authenticator.py index 417b7ba..643a2d5 100644 --- a/PFERD/auth/authenticator.py +++ b/PFERD/auth/authenticator.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Tuple from ..config import Section @@ -34,7 +35,7 @@ class Authenticator(ABC): self.name = name @abstractmethod - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: pass async def username(self) -> str: diff --git a/PFERD/auth/credential_file.py b/PFERD/auth/credential_file.py index cb7834c..94ffa73 100644 --- a/PFERD/auth/credential_file.py +++ b/PFERD/auth/credential_file.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Tuple from ..config import Config from ..utils import fmt_real_path @@ -22,9 +23,7 @@ class CredentialFileAuthenticator(Authenticator): with open(path, encoding="utf-8") as f: lines = list(f) except UnicodeDecodeError: - raise AuthLoadError( - f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8" - ) from None + raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8") except OSError as e: raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e @@ -43,5 +42,5 @@ class CredentialFileAuthenticator(Authenticator): self._username = uline[9:] self._password = pline[9:] - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: return self._username, self._password diff --git a/PFERD/auth/keyring.py b/PFERD/auth/keyring.py index 414640a..c14f6fb 100644 --- a/PFERD/auth/keyring.py +++ b/PFERD/auth/keyring.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Tuple import keyring @@ -17,6 +17,7 @@ class KeyringAuthSection(AuthSection): class KeyringAuthenticator(Authenticator): + def __init__(self, name: str, section: KeyringAuthSection) -> None: super().__init__(name) @@ -27,7 +28,7 @@ class KeyringAuthenticator(Authenticator): self._password_invalidated = False self._username_fixed = section.username() is not None - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: # Request the username if self._username is None: async with log.exclusive_output(): diff --git a/PFERD/auth/pass_.py b/PFERD/auth/pass_.py index c5d9b24..4c8e775 100644 --- a/PFERD/auth/pass_.py +++ b/PFERD/auth/pass_.py @@ -1,5 +1,6 @@ import re import subprocess +from typing import List, Tuple from ..logging import log from .authenticator import Authenticator, AuthError, AuthSection @@ -11,11 +12,11 @@ class PassAuthSection(AuthSection): self.missing_value("passname") return value - def username_prefixes(self) -> list[str]: + def username_prefixes(self) -> List[str]: value = self.s.get("username_prefixes", "login,username,user") return [prefix.lower() for prefix in value.split(",")] - def password_prefixes(self) -> list[str]: + def password_prefixes(self) -> List[str]: value = self.s.get("password_prefixes", "password,pass,secret") return [prefix.lower() for prefix in value.split(",")] @@ -30,14 +31,14 @@ class PassAuthenticator(Authenticator): self._username_prefixes = section.username_prefixes() self._password_prefixes = section.password_prefixes() - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: log.explain_topic("Obtaining credentials from pass") try: log.explain(f"Calling 'pass show {self._passname}'") result = subprocess.check_output(["pass", "show", self._passname], text=True) except subprocess.CalledProcessError as e: - raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e + raise AuthError(f"Failed to get password info from {self._passname}: {e}") prefixed = {} unprefixed = [] diff --git a/PFERD/auth/simple.py b/PFERD/auth/simple.py index dea4b67..831c12f 100644 --- a/PFERD/auth/simple.py +++ b/PFERD/auth/simple.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Tuple from ..logging import log from ..utils import agetpass, ainput @@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator): self._username_fixed = self.username is not None self._password_fixed = self.password is not None - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: if self._username is not None and self._password is not None: return self._username, self._password diff --git a/PFERD/auth/tfa.py b/PFERD/auth/tfa.py index 6ae48fe..26b1383 100644 --- a/PFERD/auth/tfa.py +++ b/PFERD/auth/tfa.py @@ -1,3 +1,5 @@ +from typing import Tuple + from ..logging import log from ..utils import ainput from .authenticator import Authenticator, AuthError @@ -15,7 +17,7 @@ class TfaAuthenticator(Authenticator): code = await ainput("TFA code: ") return code - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: raise AuthError("TFA authenticator does not support usernames") def invalidate_username(self) -> None: diff --git a/PFERD/cli/command_ilias_web.py b/PFERD/cli/command_ilias_web.py index b68e48f..77a1657 100644 --- a/PFERD/cli/command_ilias_web.py +++ b/PFERD/cli/command_ilias_web.py @@ -21,20 +21,23 @@ GROUP.add_argument( "--base-url", type=str, metavar="BASE_URL", - help="The base url of the ilias instance", + help="The base url of the ilias instance" ) GROUP.add_argument( "--client-id", type=str, metavar="CLIENT_ID", - help="The client id of the ilias instance", + help="The client id of the ilias instance" ) configure_common_group_args(GROUP) -def load(args: argparse.Namespace, parser: configparser.ConfigParser) -> None: +def load( + args: argparse.Namespace, + parser: configparser.ConfigParser, +) -> None: log.explain(f"Creating config for command '{COMMAND_NAME}'") parser["crawl:ilias"] = {} @@ -42,8 +45,8 @@ def load(args: argparse.Namespace, parser: configparser.ConfigParser) -> None: load_crawler(args, section) section["type"] = COMMAND_NAME - if args.base_url is not None: - section["base_url"] = args.base_url + if args.ilias_url is not None: + section["base_url"] = args.ilias_url if args.client_id is not None: section["client_id"] = args.client_id diff --git a/PFERD/cli/command_kit_ilias_web.py b/PFERD/cli/command_kit_ilias_web.py index b3b45c5..10797c2 100644 --- a/PFERD/cli/command_kit_ilias_web.py +++ b/PFERD/cli/command_kit_ilias_web.py @@ -21,8 +21,8 @@ configure_common_group_args(GROUP) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain(f"Creating config for command '{COMMAND_NAME}'") diff --git a/PFERD/cli/command_kit_ipd.py b/PFERD/cli/command_kit_ipd.py index a80af03..b53e67e 100644 --- a/PFERD/cli/command_kit_ipd.py +++ b/PFERD/cli/command_kit_ipd.py @@ -18,30 +18,25 @@ GROUP.add_argument( "--link-regex", type=str, metavar="REGEX", - help="href-matching regex to identify downloadable files", -) -GROUP.add_argument( - "--basic-auth", - action="store_true", - help="enable basic authentication", + help="href-matching regex to identify downloadable files" ) GROUP.add_argument( "target", type=str, metavar="TARGET", - help="url to crawl", + help="url to crawl" ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory", + help="output directory" ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'kit-ipd'") @@ -55,11 +50,5 @@ def load( if args.link_regex: section["link_regex"] = str(args.link_regex) - if args.basic_auth: - section["auth"] = "auth:kit-ipd" - parser["auth:kit-ipd"] = {} - auth_section = parser["auth:kit-ipd"] - auth_section["type"] = "simple" - SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_local.py b/PFERD/cli/command_local.py index 6016afa..309c42f 100644 --- a/PFERD/cli/command_local.py +++ b/PFERD/cli/command_local.py @@ -18,37 +18,37 @@ GROUP.add_argument( "target", type=Path, metavar="TARGET", - help="directory to crawl", + help="directory to crawl" ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory", + help="output directory" ) GROUP.add_argument( "--crawl-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for crawl requests", + help="artificial delay to simulate for crawl requests" ) GROUP.add_argument( "--download-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for download requests", + help="artificial delay to simulate for download requests" ) GROUP.add_argument( "--download-speed", type=int, metavar="BYTES_PER_SECOND", - help="download speed to simulate", + help="download speed to simulate" ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'local'") diff --git a/PFERD/cli/common_ilias_args.py b/PFERD/cli/common_ilias_args.py index edad6da..bbbbee5 100644 --- a/PFERD/cli/common_ilias_args.py +++ b/PFERD/cli/common_ilias_args.py @@ -12,60 +12,58 @@ def configure_common_group_args(group: argparse._ArgumentGroup) -> None: "target", type=str, metavar="TARGET", - help="course id, 'desktop', or ILIAS URL to crawl", + help="course id, 'desktop', or ILIAS URL to crawl" ) group.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory", + help="output directory" ) group.add_argument( - "--username", - "-u", + "--username", "-u", type=str, metavar="USERNAME", - help="user name for authentication", + help="user name for authentication" ) group.add_argument( "--keyring", action=BooleanOptionalAction, - help="use the system keyring to store and retrieve passwords", + help="use the system keyring to store and retrieve passwords" ) group.add_argument( "--credential-file", type=Path, metavar="PATH", - help="read username and password from a credential file", + help="read username and password from a credential file" ) group.add_argument( "--links", type=show_value_error(Links.from_string), metavar="OPTION", - help="how to represent external links", + help="how to represent external links" ) group.add_argument( "--link-redirect-delay", type=int, metavar="SECONDS", - help="time before 'fancy' links redirect to to their target (-1 to disable)", + help="time before 'fancy' links redirect to to their target (-1 to disable)" ) group.add_argument( "--videos", action=BooleanOptionalAction, - help="crawl and download videos", + help="crawl and download videos" ) group.add_argument( "--forums", action=BooleanOptionalAction, - help="crawl and download forum posts", + help="crawl and download forum posts" ) group.add_argument( - "--http-timeout", - "-t", + "--http-timeout", "-t", type=float, metavar="SECONDS", - help="timeout for all HTTP requests", + help="timeout for all HTTP requests" ) diff --git a/PFERD/cli/parser.py b/PFERD/cli/parser.py index c9bec13..be483fd 100644 --- a/PFERD/cli/parser.py +++ b/PFERD/cli/parser.py @@ -1,9 +1,8 @@ import argparse import configparser from argparse import ArgumentTypeError -from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Optional +from typing import Any, Callable, List, Optional, Sequence, Union from ..output_dir import OnConflict, Redownload from ..version import NAME, VERSION @@ -16,15 +15,15 @@ class ParserLoadError(Exception): # TODO Replace with argparse version when updating to 3.9? class BooleanOptionalAction(argparse.Action): def __init__( - self, - option_strings: list[str], - dest: Any, - default: Any = None, - type: Any = None, - choices: Any = None, - required: Any = False, - help: Any = None, - metavar: Any = None, + self, + option_strings: List[str], + dest: Any, + default: Any = None, + type: Any = None, + choices: Any = None, + required: Any = False, + help: Any = None, + metavar: Any = None, ): if len(option_strings) != 1: raise ValueError("There must be exactly one option string") @@ -49,11 +48,11 @@ class BooleanOptionalAction(argparse.Action): ) def __call__( - self, - parser: argparse.ArgumentParser, - namespace: argparse.Namespace, - values: str | Sequence[Any] | None, - option_string: Optional[str] = None, + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = None, ) -> None: if option_string and option_string in self.option_strings: value = not option_string.startswith("--no-") @@ -68,13 +67,11 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]: Some validation functions (like the from_string in our enums) raise a ValueError. Argparse only pretty-prints ArgumentTypeErrors though, so we need to wrap our ValueErrors. """ - def wrapper(input: str) -> Any: try: return inner(input) except ValueError as e: - raise ArgumentTypeError(e) from e - + raise ArgumentTypeError(e) return wrapper @@ -84,57 +81,52 @@ CRAWLER_PARSER_GROUP = CRAWLER_PARSER.add_argument_group( description="arguments common to all crawlers", ) CRAWLER_PARSER_GROUP.add_argument( - "--redownload", - "-r", + "--redownload", "-r", type=show_value_error(Redownload.from_string), metavar="OPTION", - help="when to download a file that's already present locally", + help="when to download a file that's already present locally" ) CRAWLER_PARSER_GROUP.add_argument( "--on-conflict", type=show_value_error(OnConflict.from_string), metavar="OPTION", - help="what to do when local and remote files or directories differ", + help="what to do when local and remote files or directories differ" ) CRAWLER_PARSER_GROUP.add_argument( - "--transform", - "-T", + "--transform", "-T", action="append", type=str, metavar="RULE", - help="add a single transformation rule. Can be specified multiple times", + help="add a single transformation rule. Can be specified multiple times" ) CRAWLER_PARSER_GROUP.add_argument( - "--tasks", - "-n", + "--tasks", "-n", type=int, metavar="N", - help="maximum number of concurrent tasks (crawling, downloading)", + help="maximum number of concurrent tasks (crawling, downloading)" ) CRAWLER_PARSER_GROUP.add_argument( - "--downloads", - "-N", + "--downloads", "-N", type=int, metavar="N", - help="maximum number of tasks that may download data at the same time", + help="maximum number of tasks that may download data at the same time" ) CRAWLER_PARSER_GROUP.add_argument( - "--task-delay", - "-d", + "--task-delay", "-d", type=float, metavar="SECONDS", - help="time the crawler should wait between subsequent tasks", + help="time the crawler should wait between subsequent tasks" ) CRAWLER_PARSER_GROUP.add_argument( "--windows-paths", action=BooleanOptionalAction, - help="whether to repair invalid paths on windows", + help="whether to repair invalid paths on windows" ) def load_crawler( - args: argparse.Namespace, - section: configparser.SectionProxy, + args: argparse.Namespace, + section: configparser.SectionProxy, ) -> None: if args.redownload is not None: section["redownload"] = args.redownload.value @@ -160,79 +152,79 @@ PARSER.add_argument( version=f"{NAME} {VERSION} (https://github.com/Garmelon/PFERD)", ) PARSER.add_argument( - "--config", - "-c", + "--config", "-c", type=Path, metavar="PATH", - help="custom config file", + help="custom config file" ) PARSER.add_argument( "--dump-config", action="store_true", - help="dump current configuration to the default config path and exit", + help="dump current configuration to the default config path and exit" ) PARSER.add_argument( "--dump-config-to", metavar="PATH", - help="dump current configuration to a file and exit. Use '-' as path to print to stdout instead", + help="dump current configuration to a file and exit." + " Use '-' as path to print to stdout instead" ) PARSER.add_argument( "--debug-transforms", action="store_true", - help="apply transform rules to files of previous run", + help="apply transform rules to files of previous run" ) PARSER.add_argument( - "--crawler", - "-C", + "--crawler", "-C", action="append", type=str, metavar="NAME", - help="only execute a single crawler. Can be specified multiple times to execute multiple crawlers", + help="only execute a single crawler." + " Can be specified multiple times to execute multiple crawlers" ) PARSER.add_argument( - "--skip", - "-S", + "--skip", "-S", action="append", type=str, metavar="NAME", - help="don't execute this particular crawler. Can be specified multiple times to skip multiple crawlers", + help="don't execute this particular crawler." + " Can be specified multiple times to skip multiple crawlers" ) PARSER.add_argument( "--working-dir", type=Path, metavar="PATH", - help="custom working directory", + help="custom working directory" ) PARSER.add_argument( "--explain", action=BooleanOptionalAction, - help="log and explain in detail what PFERD is doing", + help="log and explain in detail what PFERD is doing" ) PARSER.add_argument( "--status", action=BooleanOptionalAction, - help="print status updates while PFERD is crawling", + help="print status updates while PFERD is crawling" ) PARSER.add_argument( "--report", action=BooleanOptionalAction, - help="print a report of all local changes before exiting", + help="print a report of all local changes before exiting" ) PARSER.add_argument( "--share-cookies", action=BooleanOptionalAction, - help="whether crawlers should share cookies where applicable", + help="whether crawlers should share cookies where applicable" ) PARSER.add_argument( "--show-not-deleted", action=BooleanOptionalAction, - help="print messages in status and report when PFERD did not delete a local only file", + help="print messages in status and report when PFERD did not delete a local only file" ) def load_default_section( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: section = parser[parser.default_section] diff --git a/PFERD/config.py b/PFERD/config.py index 7da2889..b2cff4e 100644 --- a/PFERD/config.py +++ b/PFERD/config.py @@ -3,7 +3,7 @@ import os import sys from configparser import ConfigParser, SectionProxy from pathlib import Path -from typing import Any, NoReturn, Optional +from typing import Any, List, NoReturn, Optional, Tuple from rich.markup import escape @@ -53,10 +53,10 @@ class Section: raise ConfigOptionError(self.s.name, key, desc) def invalid_value( - self, - key: str, - value: Any, - reason: Optional[str], + self, + key: str, + value: Any, + reason: Optional[str], ) -> NoReturn: if reason is None: self.error(key, f"Invalid value {value!r}") @@ -126,13 +126,13 @@ class Config: with open(path, encoding="utf-8") as f: parser.read_file(f, source=str(path)) except FileNotFoundError: - raise ConfigLoadError(path, "File does not exist") from None + raise ConfigLoadError(path, "File does not exist") except IsADirectoryError: - raise ConfigLoadError(path, "That's a directory, not a file") from None + raise ConfigLoadError(path, "That's a directory, not a file") except PermissionError: - raise ConfigLoadError(path, "Insufficient permissions") from None + raise ConfigLoadError(path, "Insufficient permissions") except UnicodeDecodeError: - raise ConfigLoadError(path, "File is not encoded using UTF-8") from None + raise ConfigLoadError(path, "File is not encoded using UTF-8") def dump(self, path: Optional[Path] = None) -> None: """ @@ -150,8 +150,8 @@ class Config: try: path.parent.mkdir(parents=True, exist_ok=True) - except PermissionError as e: - raise ConfigDumpError(path, "Could not create parent directory") from e + except PermissionError: + raise ConfigDumpError(path, "Could not create parent directory") try: # Ensuring we don't accidentally overwrite any existing files by @@ -167,16 +167,16 @@ class Config: with open(path, "w", encoding="utf-8") as f: self._parser.write(f) else: - raise ConfigDumpError(path, "File already exists") from None + raise ConfigDumpError(path, "File already exists") except IsADirectoryError: - raise ConfigDumpError(path, "That's a directory, not a file") from None - except PermissionError as e: - raise ConfigDumpError(path, "Insufficient permissions") from e + raise ConfigDumpError(path, "That's a directory, not a file") + except PermissionError: + raise ConfigDumpError(path, "Insufficient permissions") def dump_to_stdout(self) -> None: self._parser.write(sys.stdout) - def crawl_sections(self) -> list[tuple[str, SectionProxy]]: + def crawl_sections(self) -> List[Tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("crawl:"): @@ -184,7 +184,7 @@ class Config: return result - def auth_sections(self) -> list[tuple[str, SectionProxy]]: + def auth_sections(self) -> List[Tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("auth:"): diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 9ba6a37..9a0e080 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -1,5 +1,5 @@ -from collections.abc import Callable from configparser import SectionProxy +from typing import Callable, Dict from ..auth import Authenticator from ..config import Config @@ -8,19 +8,20 @@ from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection -CrawlerConstructor = Callable[ - [ - str, # Name (without the "crawl:" prefix) - SectionProxy, # Crawler's section of global config - Config, # Global config - dict[str, Authenticator], # Loaded authenticators by name - ], - Crawler, -] +CrawlerConstructor = Callable[[ + str, # Name (without the "crawl:" prefix) + SectionProxy, # Crawler's section of global config + Config, # Global config + Dict[str, Authenticator], # Loaded authenticators by name +], Crawler] -CRAWLERS: dict[str, CrawlerConstructor] = { - "local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c), - "ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), - "kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), - "kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c, a), +CRAWLERS: Dict[str, CrawlerConstructor] = { + "local": lambda n, s, c, a: + LocalCrawler(n, LocalCrawlerSection(s), c), + "ilias-web": lambda n, s, c, a: + IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), + "kit-ilias-web": lambda n, s, c, a: + KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), + "kit-ipd": lambda n, s, c, a: + KitIpdCrawler(n, KitIpdCrawlerSection(s), c), } diff --git a/PFERD/crawl/crawler.py b/PFERD/crawl/crawler.py index e2cdf30..fda1307 100644 --- a/PFERD/crawl/crawler.py +++ b/PFERD/crawl/crawler.py @@ -1,10 +1,10 @@ import asyncio import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Callable, Coroutine, Sequence +from collections.abc import Awaitable, Coroutine from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Optional, TypeVar +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar from ..auth import Authenticator from ..config import Config, Section @@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]): return bar -class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): +class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): super().__init__() @@ -128,13 +128,12 @@ class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): def path(self) -> PurePath: return self._path - async def _on_aenter(self) -> tuple[ProgressBar, FileSink]: + async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: await self._stack.enter_async_context(self._limiter.limit_download()) sink = await self._stack.enter_async_context(self._fs_token) # The "Downloaded ..." message is printed in the output dir, not here - bar = self._stack.enter_context( - log.download_bar("[bold bright_cyan]", "Downloading", fmt_path(self._path)) - ) + bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading", + fmt_path(self._path))) return bar, sink @@ -150,7 +149,9 @@ class CrawlerSection(Section): return self.s.getboolean("skip", fallback=False) def output_dir(self, name: str) -> Path: - name = name.removeprefix("crawl:") + # TODO Use removeprefix() after switching to 3.9 + if name.startswith("crawl:"): + name = name[len("crawl:"):] return Path(self.s.get("output_dir", name)).expanduser() def redownload(self) -> Redownload: @@ -205,7 +206,7 @@ class CrawlerSection(Section): on_windows = os.name == "nt" return self.s.getboolean("windows_paths", fallback=on_windows) - def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator: + def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: value = self.s.get("auth") if value is None: self.missing_value("auth") @@ -217,10 +218,10 @@ class CrawlerSection(Section): class Crawler(ABC): def __init__( - self, - name: str, - section: CrawlerSection, - config: Config, + self, + name: str, + section: CrawlerSection, + config: Config, ) -> None: """ Initialize a crawler from its name and its section in the config file. @@ -262,7 +263,7 @@ class Crawler(ABC): return self._output_dir @staticmethod - async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]: + async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: """ Similar to asyncio.gather. However, in the case of an exception, all still running tasks are cancelled and the exception is rethrown. @@ -293,39 +294,14 @@ class Crawler(ABC): log.explain("Answer: Yes") return CrawlToken(self._limiter, path) - def should_try_download( - self, - path: PurePath, - *, - etag_differs: Optional[bool] = None, - mtime: Optional[datetime] = None, - redownload: Optional[Redownload] = None, - on_conflict: Optional[OnConflict] = None, - ) -> bool: - log.explain_topic(f"Decision: Should Download {fmt_path(path)}") - - if self._transformer.transform(path) is None: - log.explain("Answer: No (ignored)") - return False - - should_download = self._output_dir.should_try_download( - path, etag_differs=etag_differs, mtime=mtime, redownload=redownload, on_conflict=on_conflict - ) - if should_download: - log.explain("Answer: Yes") - return True - else: - log.explain("Answer: No") - return False - async def download( - self, - path: PurePath, - *, - etag_differs: Optional[bool] = None, - mtime: Optional[datetime] = None, - redownload: Optional[Redownload] = None, - on_conflict: Optional[OnConflict] = None, + self, + path: PurePath, + *, + etag_differs: Optional[bool] = None, + mtime: Optional[datetime] = None, + redownload: Optional[Redownload] = None, + on_conflict: Optional[OnConflict] = None, ) -> Optional[DownloadToken]: log.explain_topic(f"Decision: Download {fmt_path(path)}") path = self._deduplicator.mark(path) @@ -343,7 +319,7 @@ class Crawler(ABC): etag_differs=etag_differs, mtime=mtime, redownload=redownload, - on_conflict=on_conflict, + on_conflict=on_conflict ) if fs_token is None: log.explain("Answer: No") @@ -394,7 +370,7 @@ class Crawler(ABC): log.warn("Couldn't find or load old report") return - seen: set[PurePath] = set() + seen: Set[PurePath] = set() for known in sorted(self.prev_report.found_paths): looking_at = list(reversed(known.parents)) + [known] for path in looking_at: diff --git a/PFERD/crawl/http_crawler.py b/PFERD/crawl/http_crawler.py index 49d6013..2cc97e1 100644 --- a/PFERD/crawl/http_crawler.py +++ b/PFERD/crawl/http_crawler.py @@ -3,7 +3,7 @@ import http.cookies import ssl from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Optional +from typing import Any, Dict, List, Optional, Tuple import aiohttp import certifi @@ -13,7 +13,7 @@ from bs4 import Tag from ..auth import Authenticator from ..config import Config from ..logging import log -from ..utils import fmt_real_path, sanitize_path_name +from ..utils import fmt_real_path from ..version import NAME, VERSION from .crawler import Crawler, CrawlerSection @@ -22,18 +22,18 @@ ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags" class HttpCrawlerSection(CrawlerSection): def http_timeout(self) -> float: - return self.s.getfloat("http_timeout", fallback=30) + return self.s.getfloat("http_timeout", fallback=20) class HttpCrawler(Crawler): COOKIE_FILE = PurePath(".cookies") def __init__( - self, - name: str, - section: HttpCrawlerSection, - config: Config, - shared_auth: Optional[Authenticator] = None, + self, + name: str, + section: HttpCrawlerSection, + config: Config, + shared_auth: Optional[Authenticator] = None, ) -> None: super().__init__(name, section, config) @@ -43,7 +43,7 @@ class HttpCrawler(Crawler): self._http_timeout = section.http_timeout() self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE) - self._shared_cookie_jar_paths: Optional[list[Path]] = None + self._shared_cookie_jar_paths: Optional[List[Path]] = None self._shared_auth = shared_auth self._output_dir.register_reserved(self.COOKIE_FILE) @@ -98,7 +98,7 @@ class HttpCrawler(Crawler): """ raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation") - def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None: + def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None: if not self._shared_auth: return @@ -192,7 +192,7 @@ class HttpCrawler(Crawler): if level_heading is None: return find_associated_headings(tag, level - 1) - folder_name = sanitize_path_name(level_heading.get_text().strip()) + folder_name = level_heading.getText().strip() return find_associated_headings(level_heading, level - 1) / folder_name # start at level

because paragraph-level headings are usually too granular for folder names @@ -219,7 +219,7 @@ class HttpCrawler(Crawler): etags[str(path)] = etag self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags) - async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]: + async def _request_resource_version(self, resource_url: str) -> Tuple[Optional[str], Optional[datetime]]: """ Requests the ETag and Last-Modified headers of a resource via a HEAD request. If no entity tag / modification date can be obtained, the according value will be None. @@ -231,7 +231,6 @@ class HttpCrawler(Crawler): etag_header = resp.headers.get("ETag") last_modified_header = resp.headers.get("Last-Modified") - last_modified = None if last_modified_header: try: @@ -252,23 +251,23 @@ class HttpCrawler(Crawler): self._load_cookies() async with aiohttp.ClientSession( - headers={"User-Agent": f"{NAME}/{VERSION}"}, - cookie_jar=self._cookie_jar, - connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), - timeout=ClientTimeout( - # 30 minutes. No download in the history of downloads was longer than 30 minutes. - # This is enough to transfer a 600 MB file over a 3 Mib/s connection. - # Allowing an arbitrary value could be annoying for overnight batch jobs - total=15 * 60, - connect=self._http_timeout, - sock_connect=self._http_timeout, - sock_read=self._http_timeout, - ), - # See https://github.com/aio-libs/aiohttp/issues/6626 - # Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the - # passed signature. Shibboleth will not accept the broken signature and authentication will - # fail. - requote_redirect_url=False, + headers={"User-Agent": f"{NAME}/{VERSION}"}, + cookie_jar=self._cookie_jar, + connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), + timeout=ClientTimeout( + # 30 minutes. No download in the history of downloads was longer than 30 minutes. + # This is enough to transfer a 600 MB file over a 3 Mib/s connection. + # Allowing an arbitrary value could be annoying for overnight batch jobs + total=15 * 60, + connect=self._http_timeout, + sock_connect=self._http_timeout, + sock_read=self._http_timeout, + ), + # See https://github.com/aio-libs/aiohttp/issues/6626 + # Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the + # passed signature. Shibboleth will not accept the broken signature and authentication will + # fail. + requote_redirect_url=False ) as session: self.session = session try: diff --git a/PFERD/crawl/ilias/__init__.py b/PFERD/crawl/ilias/__init__.py index fa1aaed..287bd3d 100644 --- a/PFERD/crawl/ilias/__init__.py +++ b/PFERD/crawl/ilias/__init__.py @@ -1,9 +1,5 @@ -from .kit_ilias_web_crawler import ( - IliasWebCrawler, - IliasWebCrawlerSection, - KitIliasWebCrawler, - KitIliasWebCrawlerSection, -) +from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, + KitIliasWebCrawlerSection) __all__ = [ "IliasWebCrawler", diff --git a/PFERD/crawl/ilias/async_helper.py b/PFERD/crawl/ilias/async_helper.py index 2e6b301..5e586b1 100644 --- a/PFERD/crawl/ilias/async_helper.py +++ b/PFERD/crawl/ilias/async_helper.py @@ -1,6 +1,5 @@ import asyncio -from collections.abc import Callable -from typing import Any, Optional +from typing import Any, Callable, Optional import aiohttp @@ -16,9 +15,9 @@ def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Calla try: return await f(*args, **kwargs) except aiohttp.ContentTypeError: # invalid content type - raise CrawlWarning("ILIAS returned an invalid content type") from None + raise CrawlWarning("ILIAS returned an invalid content type") except aiohttp.TooManyRedirects: - raise CrawlWarning("Got stuck in a redirect loop") from None + raise CrawlWarning("Got stuck in a redirect loop") except aiohttp.ClientPayloadError as e: # encoding or not enough bytes last_exception = e except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc. diff --git a/PFERD/crawl/ilias/file_templates.py b/PFERD/crawl/ilias/file_templates.py index c832977..b206461 100644 --- a/PFERD/crawl/ilias/file_templates.py +++ b/PFERD/crawl/ilias/file_templates.py @@ -1,7 +1,5 @@ -import dataclasses -import re from enum import Enum -from typing import Optional, cast +from typing import Optional import bs4 @@ -14,9 +12,7 @@ _link_template_fancy = """ ILIAS - Link: {{name}} - - -
- -
- -
-
- {{name}} -
-
{{description}}
-
- +
+ - +
+
+ {{name}} +
+
{{description}}
+
+
@@ -111,7 +96,6 @@ _link_template_fancy = """ _link_template_internet_shortcut = """ [InternetShortcut] URL={{link}} -Desc={{description}} """.strip() _learning_module_template = """ @@ -142,88 +126,6 @@ _learning_module_template = """ """ -_forum_thread_template = """ - - - - - ILIAS - Forum: {{name}} - - - - {{heading}} - {{content}} - - -""".strip() # noqa: E501 line too long - def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str: # Seems to be comments, ignore those. @@ -237,13 +139,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
""" if prev and body.select_one(".ilc_page_lnav_LeftNavigation"): - text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip() + text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip() left = f'{text}' else: left = "" if next and body.select_one(".ilc_page_rnav_RightNavigation"): - text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip() + text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip() right = f'{text}' else: right = "" @@ -254,29 +156,12 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next ) if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"): - bot_nav.replace_with( - soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode()) + bot_nav.replace_with(soupify(nav_template.replace( + "{{left}}", left).replace("{{right}}", right).encode()) ) - body_str = body.prettify() - return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name) - - -def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str: - if title := heading.find(name="b"): - title.wrap(bs4.Tag(name="a", attrs={"href": url})) - return ( - _forum_thread_template.replace("{{name}}", name) - .replace("{{heading}}", heading.prettify()) - .replace("{{content}}", content.prettify()) - ) - - -@dataclasses.dataclass -class LinkData: - name: str - url: str - description: str + body = body.prettify() + return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name) class Links(Enum): @@ -296,9 +181,6 @@ class Links(Enum): return None raise ValueError("Missing switch case") - def collection_as_one(self) -> bool: - return self == Links.FANCY - def extension(self) -> Optional[str]: if self == Links.FANCY: return ".html" @@ -310,47 +192,10 @@ class Links(Enum): return None raise ValueError("Missing switch case") - def interpolate(self, redirect_delay: int, collection_name: str, links: list[LinkData]) -> str: - template = self.template() - if template is None: - raise ValueError("Cannot interpolate ignored links") - - if len(links) == 1: - link = links[0] - content = template - content = content.replace("{{link}}", link.url) - content = content.replace("{{name}}", link.name) - content = content.replace("{{description}}", link.description) - content = content.replace("{{redirect_delay}}", str(redirect_delay)) - return content - if self == Links.PLAINTEXT or self == Links.INTERNET_SHORTCUT: - return "\n".join(f"{link.url}" for link in links) - - # All others get coerced to fancy - content = cast(str, Links.FANCY.template()) - repeated_content = cast( - re.Match[str], re.search(r"([\s\S]+)", content) - ).group(1) - - parts = [] - for link in links: - instance = repeated_content - instance = instance.replace("{{link}}", link.url) - instance = instance.replace("{{name}}", link.name) - instance = instance.replace("{{description}}", link.description) - instance = instance.replace("{{redirect_delay}}", str(redirect_delay)) - parts.append(instance) - - content = content.replace(repeated_content, "\n".join(parts)) - content = content.replace("{{name}}", collection_name) - content = re.sub(r"[\s\S]+", "", content) - - return content - @staticmethod def from_string(string: str) -> "Links": try: return Links(string) except ValueError: - options = [f"'{option.value}'" for option in Links] - raise ValueError(f"must be one of {', '.join(options)}") from None + raise ValueError("must be one of 'ignore', 'plaintext'," + " 'html', 'internet-shortcut'") diff --git a/PFERD/crawl/ilias/ilias_html_cleaner.py b/PFERD/crawl/ilias/ilias_html_cleaner.py index 35a7ea0..e82906f 100644 --- a/PFERD/crawl/ilias/ilias_html_cleaner.py +++ b/PFERD/crawl/ilias/ilias_html_cleaner.py @@ -1,5 +1,3 @@ -from typing import cast - from bs4 import BeautifulSoup, Comment, Tag _STYLE_TAG_CONTENT = """ @@ -39,10 +37,6 @@ _STYLE_TAG_CONTENT = """ margin: 0.5rem 0; } - img { - background-color: white; - } - body { padding: 1em; grid-template-columns: 1fr min(60rem, 90%) 1fr; @@ -60,11 +54,12 @@ _ARTICLE_WORTHY_CLASSES = [ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: head = soup.new_tag("head") soup.insert(0, head) - # Force UTF-8 encoding - head.append(soup.new_tag("meta", charset="utf-8")) + simplecss_link: Tag = soup.new_tag("link") # - head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css")) + simplecss_link["rel"] = "stylesheet" + simplecss_link["href"] = "https://cdn.simplecss.org/simple.css" + head.append(simplecss_link) # Basic style tags for compat style: Tag = soup.new_tag("style") @@ -75,18 +70,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: def clean(soup: BeautifulSoup) -> BeautifulSoup: - for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)): + for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES): block.name = "article" - for block in cast(list[Tag], soup.find_all("h3")): + for block in soup.find_all("h3"): block.name = "div" - for block in cast(list[Tag], soup.find_all("h1")): + for block in soup.find_all("h1"): block.name = "h3" - for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")): + for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"): block.name = "h3" - block["class"] += ["accordion-head"] # type: ignore + block["class"] += ["accordion-head"] for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"): children = list(dummy.children) @@ -102,7 +97,7 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup: if figure := video.find_parent("figure"): figure.decompose() - for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")): + for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): hrule_imposter.insert(0, soup.new_tag("hr")) return soup diff --git a/PFERD/crawl/ilias/ilias_web_crawler.py b/PFERD/crawl/ilias/ilias_web_crawler.py index b5041b3..2fc399d 100644 --- a/PFERD/crawl/ilias/ilias_web_crawler.py +++ b/PFERD/crawl/ilias/ilias_web_crawler.py @@ -4,7 +4,7 @@ import os import re from collections.abc import Awaitable, Coroutine from pathlib import PurePath -from typing import Any, Literal, Optional, cast +from typing import Any, Dict, List, Literal, Optional, Set, Union, cast from urllib.parse import urljoin import aiohttp @@ -15,24 +15,17 @@ from ...auth import Authenticator from ...config import Config from ...logging import ProgressBar, log from ...output_dir import FileSink, Redownload -from ...utils import fmt_path, sanitize_path_name, soupify, url_set_query_param +from ...utils import fmt_path, soupify, url_set_query_param from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical from ..http_crawler import HttpCrawler, HttpCrawlerSection from .async_helper import _iorepeat -from .file_templates import LinkData, Links, forum_thread_template, learning_module_template +from .file_templates import Links, learning_module_template from .ilias_html_cleaner import clean, insert_base_markup -from .kit_ilias_html import ( - IliasElementType, - IliasForumThread, - IliasLearningModulePage, - IliasPage, - IliasPageElement, - IliasSoup, - parse_ilias_forum_export, -) +from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, + IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) from .shibboleth_login import ShibbolethLogin -TargetType = str | int +TargetType = Union[str, int] class LoginTypeLocal: @@ -48,7 +41,7 @@ class IliasWebCrawlerSection(HttpCrawlerSection): return base_url - def login(self) -> Literal["shibboleth"] | LoginTypeLocal: + def login(self) -> Union[Literal["shibboleth"], LoginTypeLocal]: login_type = self.s.get("login_type") if not login_type: self.missing_value("login_type") @@ -62,7 +55,9 @@ class IliasWebCrawlerSection(HttpCrawlerSection): self.invalid_value("login_type", login_type, "Should be ") - def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]: + def tfa_auth( + self, authenticators: Dict[str, Authenticator] + ) -> Optional[Authenticator]: value: Optional[str] = self.s.get("tfa_auth") if value is None: return None @@ -109,10 +104,10 @@ class IliasWebCrawlerSection(HttpCrawlerSection): return self.s.getboolean("forums", fallback=False) -_DIRECTORY_PAGES: set[IliasElementType] = { +_DIRECTORY_PAGES: Set[IliasElementType] = { + IliasElementType.COURSE, IliasElementType.EXERCISE, IliasElementType.EXERCISE_FILES, - IliasElementType.EXERCISE_OVERVIEW, IliasElementType.FOLDER, IliasElementType.INFO_TAB, IliasElementType.MEDIACAST_VIDEO_FOLDER, @@ -121,7 +116,7 @@ _DIRECTORY_PAGES: set[IliasElementType] = { IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, } -_VIDEO_ELEMENTS: set[IliasElementType] = { +_VIDEO_ELEMENTS: Set[IliasElementType] = { IliasElementType.MEDIACAST_VIDEO, IliasElementType.MEDIACAST_VIDEO_FOLDER, IliasElementType.OPENCAST_VIDEO, @@ -171,19 +166,17 @@ class IliasWebCrawler(HttpCrawler): name: str, section: IliasWebCrawlerSection, config: Config, - authenticators: dict[str, Authenticator], + authenticators: Dict[str, Authenticator] ): # Setting a main authenticator for cookie sharing auth = section.auth(authenticators) super().__init__(name, section, config, shared_auth=auth) if section.tasks() > 1: - log.warn( - """ + log.warn(""" Please avoid using too many parallel requests as these are the KIT ILIAS instance's greatest bottleneck. - """.strip() - ) + """.strip()) self._auth = auth self._base_url = section.base_url() @@ -200,7 +193,7 @@ instance's greatest bottleneck. self._links = section.links() self._videos = section.videos() self._forums = section.forums() - self._visited_urls: dict[str, PurePath] = dict() + self._visited_urls: Dict[str, PurePath] = dict() async def _run(self) -> None: if isinstance(self._target, int): @@ -217,23 +210,18 @@ instance's greatest bottleneck. # Start crawling at the given course root_url = url_set_query_param( urljoin(self._base_url + "/", "goto.php"), - "target", - f"crs_{course_id}", + "target", f"crs_{course_id}", ) await self._crawl_url(root_url, expected_id=course_id) async def _crawl_desktop(self) -> None: await self._crawl_url( - urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), crawl_nested_courses=True + urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show") ) - async def _crawl_url( - self, url: str, expected_id: Optional[int] = None, crawl_nested_courses: bool = False - ) -> None: - if awaitable := await self._handle_ilias_page( - url, None, PurePath("."), expected_id, crawl_nested_courses - ): + async def _crawl_url(self, url: str, expected_id: Optional[int] = None) -> None: + if awaitable := await self._handle_ilias_page(url, None, PurePath("."), expected_id): await awaitable async def _handle_ilias_page( @@ -242,7 +230,6 @@ instance's greatest bottleneck. current_element: Optional[IliasPageElement], path: PurePath, expected_course_id: Optional[int] = None, - crawl_nested_courses: bool = False, ) -> Optional[Coroutine[Any, Any, None]]: maybe_cl = await self.crawl(path) if not maybe_cl: @@ -250,9 +237,7 @@ instance's greatest bottleneck. if current_element: self._ensure_not_seen(current_element, path) - return self._crawl_ilias_page( - url, current_element, maybe_cl, expected_course_id, crawl_nested_courses - ) + return self._crawl_ilias_page(url, current_element, maybe_cl, expected_course_id) @anoncritical async def _crawl_ilias_page( @@ -261,11 +246,10 @@ instance's greatest bottleneck. current_element: Optional[IliasPageElement], cl: CrawlToken, expected_course_id: Optional[int] = None, - crawl_nested_courses: bool = False, ) -> None: - elements: list[IliasPageElement] = [] + elements: List[IliasPageElement] = [] # A list as variable redefinitions are not propagated to outer scopes - description: list[BeautifulSoup] = [] + description: List[BeautifulSoup] = [] @_iorepeat(3, "crawling folder") async def gather_elements() -> None: @@ -273,7 +257,6 @@ instance's greatest bottleneck. async with cl: next_stage_url: Optional[str] = url current_parent = current_element - page = None while next_stage_url: soup = await self._get_page(next_stage_url) @@ -283,22 +266,19 @@ instance's greatest bottleneck. # If we expect to find a root course, enforce it if current_parent is None and expected_course_id is not None: perma_link = IliasPage.get_soup_permalink(soup) - if not perma_link or "crs/" not in perma_link: + if not perma_link or "crs_" not in perma_link: raise CrawlError("Invalid course id? Didn't find anything looking like a course") if str(expected_course_id) not in perma_link: raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}") - page = IliasPage(soup, current_parent) + page = IliasPage(soup, next_stage_url, current_parent) if next_element := page.get_next_stage_element(): current_parent = next_element next_stage_url = next_element.url else: next_stage_url = None - page = cast(IliasPage, page) elements.extend(page.get_child_elements()) - if current_element is None and (info_tab := page.get_info_tab()): - elements.append(info_tab) if description_string := page.get_description(): description.append(description_string) @@ -310,9 +290,9 @@ instance's greatest bottleneck. elements.sort(key=lambda e: e.id()) - tasks: list[Awaitable[None]] = [] + tasks: List[Awaitable[None]] = [] for element in elements: - if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses): + if handle := await self._handle_ilias_element(cl.path, element): tasks.append(asyncio.create_task(handle)) # And execute them @@ -325,30 +305,24 @@ instance's greatest bottleneck. # works correctly. @anoncritical async def _handle_ilias_element( - self, parent_path: PurePath, element: IliasPageElement, crawl_nested_courses: bool = False + self, + parent_path: PurePath, + element: IliasPageElement, ) -> Optional[Coroutine[Any, Any, None]]: # element.name might contain `/` if the crawler created nested elements, # so we can not sanitize it here. We trust in the output dir to thwart worst-case # directory escape attacks. element_path = PurePath(parent_path, element.name) - # This is symptomatic of no access to the element, for example, because - # of time availability restrictions. - if "cmdClass=ilInfoScreenGUI" in element.url and "cmd=showSummary" in element.url: - log.explain( - "Skipping element as url points to info screen, " - "this should only happen with not-yet-released elements" - ) - return None - - if element.type in _VIDEO_ELEMENTS and not self._videos: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](enable with option 'videos')", - ) - return None + if element.type in _VIDEO_ELEMENTS: + if not self._videos: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](enable with option 'videos')" + ) + return None if element.type == IliasElementType.FILE: return await self._handle_file(element, element_path) @@ -358,7 +332,7 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](enable with option 'forums')", + "[bright_black](enable with option 'forums')" ) return None return await self._handle_forum(element, element_path) @@ -367,7 +341,7 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](tests contain no relevant data)", + "[bright_black](tests contain no relevant data)" ) return None elif element.type == IliasElementType.SURVEY: @@ -375,7 +349,7 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](surveys contain no relevant data)", + "[bright_black](surveys contain no relevant data)" ) return None elif element.type == IliasElementType.SCORM_LEARNING_MODULE: @@ -383,73 +357,13 @@ instance's greatest bottleneck. "[bold bright_black]", "Ignored", fmt_path(element_path), - "[bright_black](scorm learning modules are not supported)", - ) - return None - elif element.type == IliasElementType.LITERATURE_LIST: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](literature lists are not currently supported)", - ) - return None - elif element.type == IliasElementType.LEARNING_MODULE_HTML: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](HTML learning modules are not supported)", - ) - return None - elif element.type == IliasElementType.BLOG: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](blogs are not currently supported)", - ) - return None - elif element.type == IliasElementType.DCL_RECORD_LIST: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](dcl record lists are not currently supported)", - ) - return None - elif element.type == IliasElementType.MEDIA_POOL: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](media pools are not currently supported)", - ) - return None - elif element.type == IliasElementType.COURSE: - if crawl_nested_courses: - return await self._handle_ilias_page(element.url, element, element_path) - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](not descending into linked course)", - ) - return None - elif element.type == IliasElementType.WIKI: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](wikis are not currently supported)", + "[bright_black](scorm learning modules are not supported)" ) return None elif element.type == IliasElementType.LEARNING_MODULE: return await self._handle_learning_module(element, element_path) elif element.type == IliasElementType.LINK: return await self._handle_link(element, element_path) - elif element.type == IliasElementType.LINK_COLLECTION: - return await self._handle_link(element, element_path) elif element.type == IliasElementType.BOOKING: return await self._handle_booking(element, element_path) elif element.type == IliasElementType.OPENCAST_VIDEO: @@ -475,93 +389,44 @@ instance's greatest bottleneck. log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}") log.explain(f"Links type is {self._links}") - export_url = url_set_query_param(element.url, "cmd", "exportHTML") - resolved = await self._resolve_link_target(export_url) - if resolved == "none": - links = [LinkData(element.name, "", element.description or "")] - else: - links = self._parse_link_content(element, cast(BeautifulSoup, resolved)) - - maybe_extension = self._links.extension() - - if not maybe_extension: + link_template_maybe = self._links.template() + link_extension = self._links.extension() + if not link_template_maybe or not link_extension: log.explain("Answer: No") return None else: log.explain("Answer: Yes") + element_path = element_path.with_name(element_path.name + link_extension) - if len(links) <= 1 or self._links.collection_as_one(): - element_path = element_path.with_name(element_path.name + maybe_extension) - maybe_dl = await self.download(element_path, mtime=element.mtime) - if not maybe_dl: - return None - return self._download_link(self._links, element.name, links, maybe_dl) - - maybe_cl = await self.crawl(element_path) - if not maybe_cl: + maybe_dl = await self.download(element_path, mtime=element.mtime) + if not maybe_dl: return None - # Required for download_all closure - cl = maybe_cl - extension = maybe_extension - async def download_all() -> None: - for link in links: - path = cl.path / (sanitize_path_name(link.name) + extension) - if dl := await self.download(path, mtime=element.mtime): - await self._download_link(self._links, element.name, [link], dl) - - return download_all() + return self._download_link(element, link_template_maybe, maybe_dl) @anoncritical @_iorepeat(3, "resolving link") - async def _download_link( - self, link_renderer: Links, collection_name: str, links: list[LinkData], dl: DownloadToken - ) -> None: + async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None: async with dl as (bar, sink): - rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links) - sink.file.write(rendered.encode("utf-8")) - sink.done() + export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML") + real_url = await self._resolve_link_target(export_url) + self._write_link_content(link_template, real_url, element.name, element.description, sink) - async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]: - async def impl() -> Optional[BeautifulSoup | Literal["none"]]: - async with self.session.get(export_url, allow_redirects=False) as resp: - # No redirect means we were authenticated - if hdrs.LOCATION not in resp.headers: - return soupify(await resp.read()) # .select_one("a").get("href").strip() # type: ignore - # We are either unauthenticated or the link is not active - new_url = resp.headers[hdrs.LOCATION].lower() - if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url: - return "none" - return None - - auth_id = await self._current_auth_id() - target = await impl() - if target is not None: - return target - - await self.authenticate(auth_id) - - target = await impl() - if target is not None: - return target - - raise CrawlError("resolve_link_target failed even after authenticating") - - @staticmethod - def _parse_link_content(element: IliasPageElement, content: BeautifulSoup) -> list[LinkData]: - links = list(content.select("a")) - if len(links) == 1: - url = str(links[0].get("href")).strip() - return [LinkData(name=element.name, description=element.description or "", url=url)] - - results = [] - for link in links: - url = str(link.get("href")).strip() - name = link.get_text(strip=True) - description = cast(Tag, link.find_next_sibling("dd")).get_text(strip=True) - results.append(LinkData(name=name, description=description, url=url.strip())) - - return results + def _write_link_content( + self, + link_template: str, + url: str, + name: str, + description: Optional[str], + sink: FileSink, + ) -> None: + content = link_template + content = content.replace("{{link}}", url) + content = content.replace("{{name}}", name) + content = content.replace("{{description}}", str(description)) + content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay)) + sink.file.write(content.encode("utf-8")) + sink.done() async def _handle_booking( self, @@ -586,7 +451,7 @@ instance's greatest bottleneck. self._ensure_not_seen(element, element_path) - return self._download_booking(element, maybe_dl) + return self._download_booking(element, link_template_maybe, maybe_dl) @anoncritical @_iorepeat(1, "downloading description") @@ -596,10 +461,10 @@ instance's greatest bottleneck. if not dl: return - async with dl as (_bar, sink): + async with dl as (bar, sink): description = clean(insert_base_markup(description)) - description_tag = await self.internalize_images(description) - sink.file.write(description_tag.prettify().encode("utf-8")) + description = await self.internalize_images(description) + sink.file.write(description.prettify().encode("utf-8")) sink.done() @anoncritical @@ -607,13 +472,36 @@ instance's greatest bottleneck. async def _download_booking( self, element: IliasPageElement, + link_template: str, dl: DownloadToken, ) -> None: async with dl as (bar, sink): - links = [LinkData(name=element.name, description=element.description or "", url=element.url)] - rendered = self._links.interpolate(self._link_file_redirect_delay, element.name, links) - sink.file.write(rendered.encode("utf-8")) - sink.done() + self._write_link_content(link_template, element.url, element.name, element.description, sink) + + async def _resolve_link_target(self, export_url: str) -> str: + async def impl() -> Optional[str]: + async with self.session.get(export_url, allow_redirects=False) as resp: + # No redirect means we were authenticated + if hdrs.LOCATION not in resp.headers: + return soupify(await resp.read()).select_one("a").get("href").strip() + # We are either unauthenticated or the link is not active + new_url = resp.headers[hdrs.LOCATION].lower() + if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url: + return "" + return None + + auth_id = await self._current_auth_id() + target = await impl() + if target is not None: + return target + + await self.authenticate(auth_id) + + target = await impl() + if target is not None: + return target + + raise CrawlError("resolve_link_target failed even after authenticating") async def _handle_opencast_video( self, @@ -624,7 +512,7 @@ instance's greatest bottleneck. if self.prev_report: self.report.add_custom_value( _get_video_cache_key(element), - self.prev_report.get_custom_value(_get_video_cache_key(element)), + self.prev_report.get_custom_value(_get_video_cache_key(element)) ) # A video might contain other videos, so let's "crawl" the video first @@ -658,7 +546,7 @@ instance's greatest bottleneck. def _previous_contained_opencast_videos( self, element: IliasPageElement, element_path: PurePath - ) -> list[PurePath]: + ) -> List[PurePath]: if not self.prev_report: return [] custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) @@ -696,11 +584,11 @@ instance's greatest bottleneck. def add_to_report(paths: list[str]) -> None: self.report.add_custom_value( _get_video_cache_key(element), - {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}, + {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))} ) async with dl as (bar, sink): - page = IliasPage(await self._get_page(element.url), element) + page = IliasPage(await self._get_page(element.url), element.url, element) stream_elements = page.get_child_elements() if len(stream_elements) > 1: @@ -710,11 +598,11 @@ instance's greatest bottleneck. stream_element = stream_elements[0] # We do not have a local cache yet - await self._stream_from_url(stream_element, sink, bar, is_video=True) + await self._stream_from_url(stream_element.url, sink, bar, is_video=True) add_to_report([str(self._transformer.transform(dl.path))]) return - contained_video_paths: list[str] = [] + contained_video_paths: List[str] = [] for stream_element in stream_elements: video_path = dl.path.parent / stream_element.name @@ -725,7 +613,7 @@ instance's greatest bottleneck. async with maybe_dl as (bar, sink): log.explain(f"Streaming video from real url {stream_element.url}") contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) - await self._stream_from_url(stream_element, sink, bar, is_video=True) + await self._stream_from_url(stream_element.url, sink, bar, is_video=True) add_to_report(contained_video_paths) @@ -747,15 +635,12 @@ instance's greatest bottleneck. async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None: assert dl # The function is only reached when dl is not None async with dl as (bar, sink): - await self._stream_from_url(element, sink, bar, is_video) - - async def _stream_from_url( - self, element: IliasPageElement, sink: FileSink, bar: ProgressBar, is_video: bool - ) -> None: - url = element.url + await self._stream_from_url(element.url, sink, bar, is_video) + async def _stream_from_url(self, url: str, sink: FileSink, bar: ProgressBar, is_video: bool) -> None: async def try_stream() -> bool: next_url = url + # Normal files redirect to the magazine if we are not authenticated. As files could be HTML, # we can not match on the content type here. Instead, we disallow redirects and inspect the # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume @@ -803,7 +688,7 @@ instance's greatest bottleneck. await self.authenticate(auth_id) if not await try_stream(): - raise CrawlError(f"File streaming failed after authenticate() {element!r}") + raise CrawlError("File streaming failed after authenticate()") async def _handle_forum( self, @@ -818,23 +703,36 @@ instance's greatest bottleneck. @_iorepeat(3, "crawling forum") @anoncritical async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None: + elements: List[IliasForumThread] = [] + async with cl: - inner = IliasPage(await self._get_page(element.url), element) - export_url = inner.get_forum_export_url() - if not export_url: - log.warn("Could not extract forum export url") + next_stage_url = element.url + while next_stage_url: + log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") + log.explain(f"URL: {next_stage_url}") + + soup = await self._get_page(next_stage_url) + page = IliasPage(soup, next_stage_url, element) + + if next := page.get_next_stage_element(): + next_stage_url = next.url + else: + break + + download_data = page.get_download_forum_data() + if not download_data: + raise CrawlWarning("Failed to extract forum data") + if download_data.empty: + log.explain("Forum had no threads") return + html = await self._post_authenticated(download_data.url, download_data.form_data) + elements = parse_ilias_forum_export(soupify(html)) - export = await self._post( - export_url, - {"format": "html", "cmd[createExportFile]": ""}, - ) + elements.sort(key=lambda elem: elem.title) - elements = parse_ilias_forum_export(soupify(export)) - - tasks: list[Awaitable[None]] = [] - for thread in elements: - tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url))) + tasks: List[Awaitable[None]] = [] + for elem in elements: + tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem))) # And execute them await self.gather(tasks) @@ -842,18 +740,20 @@ instance's greatest bottleneck. @anoncritical @_iorepeat(3, "saving forum thread") async def _download_forum_thread( - self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str + self, + parent_path: PurePath, + element: IliasForumThread, ) -> None: - path = parent_path / (sanitize_path_name(thread.name) + ".html") - maybe_dl = await self.download(path, mtime=thread.mtime) - if not maybe_dl or not isinstance(thread, IliasForumThread): + path = parent_path / (_sanitize_path_name(element.title) + ".html") + maybe_dl = await self.download(path, mtime=element.mtime) + if not maybe_dl: return async with maybe_dl as (bar, sink): - rendered = forum_thread_template( - thread.name, forum_url, thread.name_tag, await self.internalize_images(thread.content_tag) - ) - sink.file.write(rendered.encode("utf-8")) + content = "\n" + content += element.title_tag.prettify() + content += element.content_tag.prettify() + sink.file.write(content.encode("utf-8")) sink.done() async def _handle_learning_module( @@ -871,33 +771,33 @@ instance's greatest bottleneck. @_iorepeat(3, "crawling learning module") @anoncritical async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None: - elements: list[IliasLearningModulePage] = [] + elements: List[IliasLearningModulePage] = [] async with cl: log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain(f"URL: {element.url}") soup = await self._get_page(element.url) - page = IliasPage(soup, element) + page = IliasPage(soup, element.url, element) if next := page.get_learning_module_data(): - elements.extend( - await self._crawl_learning_module_direction(cl.path, next.previous_url, "left", element) - ) + elements.extend(await self._crawl_learning_module_direction( + cl.path, next.previous_url, "left", element + )) elements.append(next) - elements.extend( - await self._crawl_learning_module_direction(cl.path, next.next_url, "right", element) - ) + elements.extend(await self._crawl_learning_module_direction( + cl.path, next.next_url, "right", element + )) # Reflect their natural ordering in the file names for index, lm_element in enumerate(elements): lm_element.title = f"{index:02}_{lm_element.title}" - tasks: list[Awaitable[None]] = [] + tasks: List[Awaitable[None]] = [] for index, elem in enumerate(elements): prev_url = elements[index - 1].title if index > 0 else None next_url = elements[index + 1].title if index < len(elements) - 1 else None - tasks.append( - asyncio.create_task(self._download_learning_module_page(cl.path, elem, prev_url, next_url)) - ) + tasks.append(asyncio.create_task( + self._download_learning_module_page(cl.path, elem, prev_url, next_url) + )) # And execute them await self.gather(tasks) @@ -906,10 +806,10 @@ instance's greatest bottleneck. self, path: PurePath, start_url: Optional[str], - dir: Literal["left"] | Literal["right"], - parent_element: IliasPageElement, - ) -> list[IliasLearningModulePage]: - elements: list[IliasLearningModulePage] = [] + dir: Union[Literal["left"], Literal["right"]], + parent_element: IliasPageElement + ) -> List[IliasLearningModulePage]: + elements: List[IliasLearningModulePage] = [] if not start_url: return elements @@ -920,10 +820,13 @@ instance's greatest bottleneck. log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain(f"URL: {next_element_url}") soup = await self._get_page(next_element_url) - page = IliasPage(soup, parent_element) + page = IliasPage(soup, next_element_url, parent_element) if next := page.get_learning_module_data(): elements.append(next) - next_element_url = next.previous_url if dir == "left" else next.next_url + if dir == "left": + next_element_url = next.previous_url + else: + next_element_url = next.next_url counter += 1 return elements @@ -935,9 +838,9 @@ instance's greatest bottleneck. parent_path: PurePath, element: IliasLearningModulePage, prev: Optional[str], - next: Optional[str], + next: Optional[str] ) -> None: - path = parent_path / (sanitize_path_name(element.title) + ".html") + path = parent_path / (_sanitize_path_name(element.title) + ".html") maybe_dl = await self.download(path) if not maybe_dl: return @@ -946,11 +849,17 @@ instance's greatest bottleneck. return if prev: - prev_p = self._transformer.transform(parent_path / (sanitize_path_name(prev) + ".html")) - prev = os.path.relpath(prev_p, my_path.parent) if prev_p else None + prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) + if prev_p: + prev = os.path.relpath(prev_p, my_path.parent) + else: + prev = None if next: - next_p = self._transformer.transform(parent_path / (sanitize_path_name(next) + ".html")) - next = os.path.relpath(next_p, my_path.parent) if next_p else None + next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) + if next_p: + next = os.path.relpath(next_p, my_path.parent) + else: + next = None async with maybe_dl as (bar, sink): content = element.content @@ -964,16 +873,19 @@ instance's greatest bottleneck. """ log.explain_topic("Internalizing images") for elem in tag.find_all(recursive=True): - if elem.name == "img" and (src := elem.attrs.get("src", None)): - url = urljoin(self._base_url, cast(str, src)) - if not url.startswith(self._base_url): - continue - log.explain(f"Internalizing {url!r}") - img = await self._get_authenticated(url) - elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() - if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"): + if not isinstance(elem, Tag): + continue + if elem.name == "img": + if src := elem.attrs.get("src", None): + url = urljoin(self._base_url, src) + if not url.startswith(self._base_url): + continue + log.explain(f"Internalizing {url!r}") + img = await self._get_authenticated(url) + elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() + if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"): # For unknown reasons the protocol seems to be stripped. - elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"]) + elem.attrs["src"] = "https:" + elem.attrs["src"] return tag def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None: @@ -985,10 +897,10 @@ instance's greatest bottleneck. ) self._visited_urls[element.url] = parent_path - async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup: + async def _get_page(self, url: str, root_page_allowed: bool = False) -> BeautifulSoup: auth_id = await self._current_auth_id() async with self.session.get(url) as request: - soup = IliasSoup(soupify(await request.read()), str(request.url)) + soup = soupify(await request.read()) if IliasPage.is_logged_in(soup): return self._verify_page(soup, url, root_page_allowed) @@ -997,13 +909,13 @@ instance's greatest bottleneck. # Retry once after authenticating. If this fails, we will die. async with self.session.get(url) as request: - soup = IliasSoup(soupify(await request.read()), str(request.url)) + soup = soupify(await request.read()) if IliasPage.is_logged_in(soup): return self._verify_page(soup, url, root_page_allowed) raise CrawlError(f"get_page failed even after authenticating on {url!r}") @staticmethod - def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup: + def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup: if IliasPage.is_root_page(soup) and not root_page_allowed: raise CrawlError( "Unexpectedly encountered ILIAS root page. " @@ -1015,15 +927,29 @@ instance's greatest bottleneck. ) return soup - async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes: + async def _post_authenticated( + self, + url: str, + data: dict[str, Union[str, List[str]]] + ) -> bytes: + auth_id = await self._current_auth_id() + form_data = aiohttp.FormData() for key, val in data.items(): form_data.add_field(key, val) - async with self.session.post(url, data=form_data()) as request: + async with self.session.post(url, data=form_data(), allow_redirects=False) as request: if request.status == 200: return await request.read() - raise CrawlError(f"post failed with status {request.status}") + + # We weren't authenticated, so try to do that + await self.authenticate(auth_id) + + # Retry once after authenticating. If this fails, we will die. + async with self.session.post(url, data=data, allow_redirects=False) as request: + if request.status == 200: + return await request.read() + raise CrawlError("post_authenticated failed even after authenticating") async def _get_authenticated(self, url: str) -> bytes: auth_id = await self._current_auth_id() @@ -1053,22 +979,52 @@ instance's greatest bottleneck. async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: login_page = soupify(await request.read()) - login_form = login_page.find("form", attrs={"name": "login_form"}) + login_form = login_page.find("form", attrs={"name": "formlogin"}) if login_form is None: raise CrawlError("Could not find the login form! Specified client id might be invalid.") - login_url = cast(Optional[str], login_form.attrs.get("action")) + login_url = login_form.attrs.get("action") if login_url is None: raise CrawlError("Could not find the action URL in the login form!") username, password = await self._auth.credentials() - login_form_data = aiohttp.FormData() - login_form_data.add_field("login_form/input_3/input_4", username) - login_form_data.add_field("login_form/input_3/input_5", password) + login_data = { + "username": username, + "password": password, + "cmd[doStandardAuthentication]": "Login", + } # do the actual login - async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request: - soup = IliasSoup(soupify(await request.read()), str(request.url)) - if not IliasPage.is_logged_in(soup): + async with self.session.post(urljoin(self._base_url, login_url), data=login_data) as request: + soup = soupify(await request.read()) + if not self._is_logged_in(soup): self._auth.invalidate_credentials() + + @staticmethod + def _is_logged_in(soup: BeautifulSoup) -> bool: + # Normal ILIAS pages + mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar") + if mainbar is not None: + login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x}) + shib_login = soup.find(id="button_shib_login") + return not login_button and not shib_login + + # Personal Desktop + if soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}): + return True + + # Video listing embeds do not have complete ILIAS html. Try to match them by + # their video listing table + video_table = soup.find( + recursive=True, + name="table", + attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")} + ) + if video_table is not None: + return True + # The individual video player wrapper page has nothing of the above. + # Match it by its playerContainer. + if soup.select_one("#playerContainer") is not None: + return True + return False diff --git a/PFERD/crawl/ilias/kit_ilias_html.py b/PFERD/crawl/ilias/kit_ilias_html.py index 5966141..57c81e5 100644 --- a/PFERD/crawl/ilias/kit_ilias_html.py +++ b/PFERD/crawl/ilias/kit_ilias_html.py @@ -1,117 +1,30 @@ import json import re -from collections.abc import Callable from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum -from typing import Optional, cast +from typing import Dict, List, Optional, Union, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag -from PFERD.crawl import CrawlError -from PFERD.crawl.crawler import CrawlWarning from PFERD.logging import log -from PFERD.utils import sanitize_path_name, url_set_query_params +from PFERD.utils import url_set_query_params -TargetType = str | int - - -class TypeMatcher: - class UrlPath: - path: str - - def __init__(self, path: str): - self.path = path - - class UrlParameter: - query: str - - def __init__(self, query: str): - self.query = query - - class ImgSrc: - src: str - - def __init__(self, src: str): - self.src = src - - class ImgAlt: - alt: str - - def __init__(self, alt: str): - self.alt = alt - - class All: - matchers: list["IliasElementMatcher"] - - def __init__(self, matchers: list["IliasElementMatcher"]): - self.matchers = matchers - - class Any: - matchers: list["IliasElementMatcher"] - - def __init__(self, matchers: list["IliasElementMatcher"]): - self.matchers = matchers - - @staticmethod - def path(path: str) -> UrlPath: - return TypeMatcher.UrlPath(path) - - @staticmethod - def query(query: str) -> UrlParameter: - return TypeMatcher.UrlParameter(query) - - @staticmethod - def img_src(src: str) -> ImgSrc: - return TypeMatcher.ImgSrc(src) - - @staticmethod - def img_alt(alt: str) -> ImgAlt: - return TypeMatcher.ImgAlt(alt) - - @staticmethod - def all(*matchers: "IliasElementMatcher") -> All: - return TypeMatcher.All(list(matchers)) - - @staticmethod - def any(*matchers: "IliasElementMatcher") -> Any: - return TypeMatcher.Any(list(matchers)) - - @staticmethod - def never() -> Any: - return TypeMatcher.Any([]) - - -IliasElementMatcher = ( - TypeMatcher.UrlPath - | TypeMatcher.UrlParameter - | TypeMatcher.ImgSrc - | TypeMatcher.ImgAlt - | TypeMatcher.All - | TypeMatcher.Any -) +TargetType = Union[str, int] class IliasElementType(Enum): - BLOG = "blog" BOOKING = "booking" COURSE = "course" - DCL_RECORD_LIST = "dcl_record_list" - EXERCISE_OVERVIEW = "exercise_overview" - EXERCISE = "exercise" # own submitted files + EXERCISE = "exercise" EXERCISE_FILES = "exercise_files" # own submitted files FILE = "file" FOLDER = "folder" FORUM = "forum" - FORUM_THREAD = "forum_thread" INFO_TAB = "info_tab" LEARNING_MODULE = "learning_module" - LEARNING_MODULE_HTML = "learning_module_html" - LITERATURE_LIST = "literature_list" LINK = "link" - LINK_COLLECTION = "link_collection" - MEDIA_POOL = "media_pool" MEDIACAST_VIDEO = "mediacast_video" MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" MEETING = "meeting" @@ -123,123 +36,6 @@ class IliasElementType(Enum): SCORM_LEARNING_MODULE = "scorm_learning_module" SURVEY = "survey" TEST = "test" # an online test. Will be ignored currently. - WIKI = "wiki" - - def matcher(self) -> IliasElementMatcher: - match self: - case IliasElementType.BLOG: - return TypeMatcher.any(TypeMatcher.img_src("_blog.svg")) - case IliasElementType.BOOKING: - return TypeMatcher.any(TypeMatcher.path("/book/"), TypeMatcher.img_src("_book.svg")) - case IliasElementType.COURSE: - return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg")) - case IliasElementType.DCL_RECORD_LIST: - return TypeMatcher.any( - TypeMatcher.img_src("_dcl.svg"), TypeMatcher.query("cmdclass=ildclrecordlistgui") - ) - case IliasElementType.EXERCISE: - return TypeMatcher.never() - case IliasElementType.EXERCISE_FILES: - return TypeMatcher.never() - case IliasElementType.EXERCISE_OVERVIEW: - return TypeMatcher.any( - TypeMatcher.path("/exc/"), - TypeMatcher.path("_exc_"), - TypeMatcher.img_src("_exc.svg"), - ) - case IliasElementType.FILE: - return TypeMatcher.any( - TypeMatcher.query("cmd=sendfile"), - TypeMatcher.path("_file_"), - TypeMatcher.img_src("/filedelivery/"), - ) - case IliasElementType.FOLDER: - return TypeMatcher.any( - TypeMatcher.path("/fold/"), - TypeMatcher.img_src("_fold.svg"), - TypeMatcher.path("/grp/"), - TypeMatcher.img_src("_grp.svg"), - TypeMatcher.path("/copa/"), - TypeMatcher.path("_copa_"), - TypeMatcher.img_src("_copa.svg"), - # Not supported right now but warn users - # TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), - # TypeMatcher.img_alt("medienpool"), - # TypeMatcher.img_src("_mep.svg"), - ) - case IliasElementType.FORUM: - return TypeMatcher.any( - TypeMatcher.path("/frm/"), - TypeMatcher.path("_frm_"), - TypeMatcher.img_src("_frm.svg"), - ) - case IliasElementType.FORUM_THREAD: - return TypeMatcher.never() - case IliasElementType.INFO_TAB: - return TypeMatcher.never() - case IliasElementType.LITERATURE_LIST: - return TypeMatcher.img_src("_bibl.svg") - case IliasElementType.LEARNING_MODULE: - return TypeMatcher.any(TypeMatcher.path("/lm/"), TypeMatcher.img_src("_lm.svg")) - case IliasElementType.LEARNING_MODULE_HTML: - return TypeMatcher.any( - TypeMatcher.query("baseclass=ilhtlmpresentationgui"), TypeMatcher.img_src("_htlm.svg") - ) - case IliasElementType.LINK: - return TypeMatcher.any( - TypeMatcher.all( - TypeMatcher.query("baseclass=illinkresourcehandlergui"), - TypeMatcher.query("calldirectlink"), - ), - TypeMatcher.img_src("_webr.svg"), # duplicated :( - ) - case IliasElementType.LINK_COLLECTION: - return TypeMatcher.any( - TypeMatcher.query("baseclass=illinkresourcehandlergui"), - TypeMatcher.img_src("_webr.svg"), # duplicated :( - ) - case IliasElementType.MEDIA_POOL: - return TypeMatcher.any( - TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), TypeMatcher.img_src("_mep.svg") - ) - case IliasElementType.MEDIACAST_VIDEO: - return TypeMatcher.never() - case IliasElementType.MEDIACAST_VIDEO_FOLDER: - return TypeMatcher.any( - TypeMatcher.path("/mcst/"), - TypeMatcher.query("baseclass=ilmediacasthandlergui"), - TypeMatcher.img_src("_mcst.svg"), - ) - case IliasElementType.MEETING: - return TypeMatcher.any(TypeMatcher.img_src("_sess.svg")) - case IliasElementType.MOB_VIDEO: - return TypeMatcher.never() - case IliasElementType.OPENCAST_VIDEO: - return TypeMatcher.never() - case IliasElementType.OPENCAST_VIDEO_FOLDER: - return TypeMatcher.never() - case IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: - return TypeMatcher.img_alt("opencast") - case IliasElementType.OPENCAST_VIDEO_PLAYER: - return TypeMatcher.never() - case IliasElementType.SCORM_LEARNING_MODULE: - return TypeMatcher.any( - TypeMatcher.query("baseclass=ilsahspresentationgui"), TypeMatcher.img_src("_sahs.svg") - ) - case IliasElementType.SURVEY: - return TypeMatcher.any(TypeMatcher.path("/svy/"), TypeMatcher.img_src("svy.svg")) - case IliasElementType.TEST: - return TypeMatcher.any( - TypeMatcher.query("cmdclass=ilobjtestgui"), - TypeMatcher.query("cmdclass=iltestscreengui"), - TypeMatcher.img_src("_tst.svg"), - ) - case IliasElementType.WIKI: - return TypeMatcher.any( - TypeMatcher.query("baseClass=ilwikihandlergui"), TypeMatcher.img_src("wiki.svg") - ) - - raise CrawlWarning(f"Unknown matcher {self}") @dataclass @@ -253,25 +49,14 @@ class IliasPageElement: def id(self) -> str: regexes = [ r"eid=(?P[0-9a-z\-]+)", - r"book/(?P\d+)", # booking - r"cat/(?P\d+)", - r"copa/(?P\d+)", # content page - r"crs/(?P\d+)", # course - r"exc/(?P\d+)", # exercise - r"file/(?P\d+)", # file - r"fold/(?P\d+)", # folder - r"frm/(?P\d+)", # forum - r"grp/(?P\d+)", # group - r"lm/(?P\d+)", # learning module - r"mcst/(?P\d+)", # mediacast - r"pg/(?P(\d|_)+)", # page? - r"svy/(?P\d+)", # survey - r"sess/(?P\d+)", # session - r"webr/(?P\d+)", # web referene (link) - r"thr_pk=(?P\d+)", # forums + r"file_(?P\d+)", + r"copa_(?P\d+)", + r"fold_(?P\d+)", + r"frm_(?P\d+)", + r"exc_(?P\d+)", r"ref_id=(?P\d+)", r"target=[a-z]+_(?P\d+)", - r"mm_(?P\d+)", + r"mm_(?P\d+)" ] for regex in regexes: @@ -289,15 +74,15 @@ class IliasPageElement: name: str, mtime: Optional[datetime] = None, description: Optional[str] = None, - skip_sanitize: bool = False, - ) -> "IliasPageElement": + skip_sanitize: bool = False + ) -> 'IliasPageElement': if typ == IliasElementType.MEETING: normalized = IliasPageElement._normalize_meeting_name(name) log.explain(f"Normalized meeting name from {name!r} to {normalized!r}") name = normalized if not skip_sanitize: - name = sanitize_path_name(name) + name = _sanitize_path_name(name) return IliasPageElement(typ, url, name, mtime, description) @@ -309,7 +94,7 @@ class IliasPageElement: """ # This checks whether we can reach a `:` without passing a `-` - if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108 + if re.search(r"^[^-]+: ", meeting_name): # Meeting name only contains date: "05. Jan 2000:" split_delimiter = ":" else: @@ -332,14 +117,14 @@ class IliasPageElement: @dataclass class IliasDownloadForumData: url: str - form_data: dict[str, str | list[str]] + form_data: Dict[str, Union[str, List[str]]] empty: bool @dataclass class IliasForumThread: - name: str - name_tag: Tag + title: str + title_tag: Tag content_tag: Tag mtime: Optional[datetime] @@ -352,30 +137,21 @@ class IliasLearningModulePage: previous_url: Optional[str] -class IliasSoup: - soup: BeautifulSoup - page_url: str - - def __init__(self, soup: BeautifulSoup, page_url: str): - self.soup = soup - self.page_url = page_url - - class IliasPage: - def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]): - self._ilias_soup = ilias_soup - self._soup = ilias_soup.soup - self._page_url = ilias_soup.page_url + + def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): + self._soup = soup + self._page_url = _page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" @staticmethod - def is_root_page(soup: IliasSoup) -> bool: + def is_root_page(soup: BeautifulSoup) -> bool: if permalink := IliasPage.get_soup_permalink(soup): - return "goto.php/root/" in permalink + return "goto.php?target=root_" in permalink return False - def get_child_elements(self) -> list[IliasPageElement]: + def get_child_elements(self) -> List[IliasPageElement]: """ Return all child page elements you can find here. """ @@ -402,25 +178,22 @@ class IliasPage: def get_info_tab(self) -> Optional[IliasPageElement]: tab: Optional[Tag] = self._soup.find( - name="a", attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x} + name="a", + attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x} ) if tab is not None: return IliasPageElement.create_new( - IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos" + IliasElementType.INFO_TAB, + self._abs_url_from_link(tab), + "infos" ) return None def get_description(self) -> Optional[BeautifulSoup]: - def is_interesting_class(name: str | None) -> bool: - return name in [ - "ilCOPageSection", - "ilc_Paragraph", - "ilc_va_ihcap_VAccordIHeadCap", - "ilc_va_ihcap_AccordIHeadCap", - "ilc_media_cont_MediaContainer", - ] + def is_interesting_class(name: str) -> bool: + return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] - paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class)) + paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class) if not paragraphs: return None @@ -431,20 +204,6 @@ class IliasPage: for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue - if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")): - # We have an embedded video which should be downloaded by _find_mob_videos - url, title = self._find_mob_video_url_title(video, p) - raw_html += '
External Video: {title}' - else: - raw_html += f"Video elided. Filename: '{title}'." - raw_html += "
\n" - continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: @@ -458,13 +217,13 @@ class IliasPage: def get_learning_module_data(self) -> Optional[IliasLearningModulePage]: if not self._is_learning_module_page(): return None - content = cast(Tag, self._soup.select_one("#ilLMPageContent")) - title = cast(Tag, self._soup.select_one(".ilc_page_title_PageTitle")).get_text().strip() + content = self._soup.select_one("#ilLMPageContent") + title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip() return IliasLearningModulePage( title=title, content=content, next_url=self._find_learning_module_next(), - previous_url=self._find_learning_module_prev(), + previous_url=self._find_learning_module_prev() ) def _find_learning_module_next(self) -> Optional[str]: @@ -483,28 +242,29 @@ class IliasPage: return url return None - def get_forum_export_url(self) -> Optional[str]: - forum_link = self._soup.select_one("#tab_forums_threads > a") - if not forum_link: - log.explain("Found no forum link") + def get_download_forum_data(self) -> Optional[IliasDownloadForumData]: + form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x}) + if not form: return None + post_url = self._abs_url_from_relative(form["action"]) - base_url = self._abs_url_from_link(forum_link) - base_url = re.sub(r"cmd=\w+", "cmd=post", base_url) - base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url) + thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})] - rtoken_form = self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x}) - if not rtoken_form: - log.explain("Found no rtoken anywhere") - return None - match = cast(re.Match[str], re.search(r"rtoken=(\w+)", str(rtoken_form.attrs["action"]))) - rtoken = match.group(1) + form_data: Dict[str, Union[str, List[str]]] = { + "thread_ids[]": thread_ids, + "selected_cmd2": "html", + "select_cmd2": "Ausführen", + "selected_cmd": "", + } - base_url = base_url + "&rtoken=" + rtoken - - return base_url + return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0) def get_next_stage_element(self) -> Optional[IliasPageElement]: + if self._is_forum_page(): + if "trows=800" in self._page_url: + return None + log.explain("Requesting *all* forum threads") + return self._get_show_max_forum_entries_per_page_url() if self._is_ilias_opencast_embedding(): log.explain("Unwrapping opencast embedding") return self.get_child_elements()[0] @@ -514,8 +274,6 @@ class IliasPage: if self._contains_collapsed_future_meetings(): log.explain("Requesting *all* future meetings") return self._uncollapse_future_meetings_url() - if self._is_exercise_not_all_shown(): - return self._show_all_exercises() if not self._is_content_tab_selected(): if self._page_type != IliasElementType.INFO_TAB: log.explain("Selecting content tab") @@ -524,6 +282,13 @@ class IliasPage: log.explain("Crawling info tab, skipping content select") return None + def _is_forum_page(self) -> bool: + read_more_btn = self._soup.find( + "button", + attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x} + ) + return read_more_btn is not None + def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) @@ -532,26 +297,28 @@ class IliasPage: return True # Raw listing without ILIAS fluff - video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) return video_element_table is not None def _is_ilias_opencast_embedding(self) -> bool: # ILIAS fluff around the real opencast html if self._soup.find(id="headerimage"): - element: Tag = cast(Tag, self._soup.find(id="headerimage")) - if "opencast" in cast(str, element.attrs["src"]).lower(): + element: Tag = self._soup.find(id="headerimage") + if "opencast" in element.attrs["src"].lower(): return True return False def _is_exercise_file(self) -> bool: # we know it from before - if self._page_type == IliasElementType.EXERCISE_OVERVIEW: + if self._page_type == IliasElementType.EXERCISE: return True # We have no suitable parent - let's guesss if self._soup.find(id="headerimage"): - element: Tag = cast(Tag, self._soup.find(id="headerimage")) - if "exc" in cast(str, element.attrs["src"]).lower(): + element: Tag = self._soup.find(id="headerimage") + if "exc" in element.attrs["src"].lower(): return True return False @@ -561,7 +328,7 @@ class IliasPage: def _is_content_page(self) -> bool: if link := self.get_permalink(): - return "/copa/" in link + return "target=copa_" in link return False def _is_learning_module_page(self) -> bool: @@ -575,23 +342,13 @@ class IliasPage: def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: element = self._soup.find( "a", - attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}, + attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)} ) if not element: return None link = self._abs_url_from_link(element) return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings") - def _is_exercise_not_all_shown(self) -> bool: - return ( - self._page_type == IliasElementType.EXERCISE_OVERVIEW and "mode=all" not in self._page_url.lower() - ) - - def _show_all_exercises(self) -> Optional[IliasPageElement]: - return IliasPageElement.create_new( - IliasElementType.EXERCISE_OVERVIEW, self._page_url + "&mode=all", "show all exercises" - ) - def _is_content_tab_selected(self) -> bool: return self._select_content_page_url() is None @@ -604,28 +361,31 @@ class IliasPage: def _select_content_page_url(self) -> Optional[IliasPageElement]: tab = self._soup.find( - id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x} + id="tab_view_content", + attrs={"class": lambda x: x is not None and "active" not in x} ) # Already selected (or not found) if not tab: return None link = tab.find("a") if link: - link_str = self._abs_url_from_link(link) - return IliasPageElement.create_new(IliasElementType.FOLDER, link_str, "select content page") + link = self._abs_url_from_link(link) + return IliasPageElement.create_new(IliasElementType.FOLDER, link, "select content page") _unexpected_html_warning() log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.") log.warn_contd("PFERD might not find content on the course's main page.") return None - def _player_to_video(self) -> list[IliasPageElement]: + def _player_to_video(self) -> List[IliasPageElement]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source - regex = re.compile(r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE) + regex = re.compile( + r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE + ) json_match = regex.search(str(self._soup)) if json_match is None: @@ -653,77 +413,61 @@ class IliasPage: return items - def _get_show_max_forum_entries_per_page_url( - self, wanted_max: Optional[int] = None - ) -> Optional[IliasPageElement]: + def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]: correct_link = self._soup.find( - "a", attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x} + "a", + attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x} ) if not correct_link: return None link = self._abs_url_from_link(correct_link) - if wanted_max is not None: - link = link.replace("trows=800", f"trows={wanted_max}") return IliasPageElement.create_new(IliasElementType.FORUM, link, "show all forum threads") - def _get_forum_thread_count(self) -> Optional[int]: - log.explain_topic("Trying to find forum thread count") + def _find_personal_desktop_entries(self) -> List[IliasPageElement]: + items: List[IliasPageElement] = [] - candidates = cast(list[Tag], self._soup.select(".ilTableFootLight")) - extract_regex = re.compile(r"\s(?P\d+)\s*\)") - - for candidate in candidates: - log.explain(f"Found thread count candidate: {candidate}") - if match := extract_regex.search(candidate.get_text()): - return int(match.group("max")) - else: - log.explain("Found no candidates to extract thread count from") - - return None - - def _find_personal_desktop_entries(self) -> list[IliasPageElement]: - items: list[IliasPageElement] = [] - - titles: list[Tag] = self._soup.select("#block_pditems_0 .il-item-title") + titles: List[Tag] = self._soup.select("#block_pditems_0 .il-item-title") for title in titles: link = title.find("a") if not link: - log.explain(f"Skipping offline item: {title.get_text().strip()!r}") + log.explain(f"Skipping offline item: {title.getText().strip()!r}") continue - name = sanitize_path_name(link.text.strip()) + name = _sanitize_path_name(link.text.strip()) url = self._abs_url_from_link(link) if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url: # Configure button/link does not have anything interesting continue - typ = IliasPage._find_type_for_element( - name, url, lambda: IliasPage._find_icon_for_folder_entry(cast(Tag, link)) - ) - if not typ: + type = self._find_type_from_link(name, link, url) + if not type: _unexpected_html_warning() log.warn_contd(f"Could not extract type for {link}") continue - log.explain(f"Found {name!r} of type {typ}") + log.explain(f"Found {name!r}") - items.append(IliasPageElement.create_new(typ, url, name)) + if type == IliasElementType.FILE and "_download" not in url: + url = re.sub(r"(target=file_\d+)", r"\1_download", url) + log.explain("Rewired file URL to include download part") + + items.append(IliasPageElement.create_new(type, url, name)) return items - def _find_copa_entries(self) -> list[IliasPageElement]: - items: list[IliasPageElement] = [] - links: list[Tag] = cast(list[Tag], self._soup.find_all(class_="ilc_flist_a_FileListItemLink")) + def _find_copa_entries(self) -> List[IliasPageElement]: + items: List[IliasPageElement] = [] + links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink") for link in links: url = self._abs_url_from_link(link) - name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.get_text()).strip().replace("\t", "") - name = sanitize_path_name(name) + name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "") + name = _sanitize_path_name(name) if "file_id" not in url: _unexpected_html_warning() @@ -734,26 +478,24 @@ class IliasPage: return items - def _find_info_tab_entries(self) -> list[IliasPageElement]: + def _find_info_tab_entries(self) -> List[IliasPageElement]: items = [] - links: list[Tag] = self._soup.select("a.il_ContainerItemCommand") + links: List[Tag] = self._soup.select("a.il_ContainerItemCommand") for link in links: - log.explain(f"Found info tab link: {self._abs_url_from_link(link)}") - if "cmdclass=ilobjcoursegui" not in cast(str, link["href"]).lower(): + if "cmdClass=ilobjcoursegui" not in link["href"]: continue - if "cmd=sendfile" not in cast(str, link["href"]).lower(): + if "cmd=sendfile" not in link["href"]: continue - items.append( - IliasPageElement.create_new( - IliasElementType.FILE, self._abs_url_from_link(link), sanitize_path_name(link.get_text()) - ) - ) + items.append(IliasPageElement.create_new( + IliasElementType.FILE, + self._abs_url_from_link(link), + _sanitize_path_name(link.getText()) + )) - log.explain(f"Found {len(items)} info tab entries {items}") return items - def _find_opencast_video_entries(self) -> list[IliasPageElement]: + def _find_opencast_video_entries(self) -> List[IliasPageElement]: # ILIAS has three stages for video pages # 1. The initial dummy page without any videos. This page contains the link to the listing # 2. The video listing which might be paginated @@ -761,12 +503,14 @@ class IliasPage: # # We need to figure out where we are. - video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) if video_element_table is None: # We are in stage 1 # The page is actually emtpy but contains the link to stage 2 - content_link: Tag = cast(Tag, self._soup.select_one("#tab_series a")) + content_link: Tag = self._soup.select_one("#tab_series a") url: str = self._abs_url_from_link(content_link) query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(url, query_params) @@ -777,42 +521,43 @@ class IliasPage: is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None - if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER: + if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_opencast_video_entries_paginated() return self._find_opencast_video_entries_no_paging() - def _find_opencast_video_entries_paginated(self) -> list[IliasPageElement]: - table_element = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]: + table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if table_element is None: log.warn("Couldn't increase elements per page (table not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() - id_match = re.match(r"tbl_xoct_(.+)", cast(str, table_element.attrs["id"])) + id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) if id_match is None: log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() table_id = id_match.group(1) - query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + query_params = {f"tbl_xoct_{table_id}_trows": "800", + "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(self._page_url, query_params) log.explain("Disabled pagination, retrying folder as a new entry") return [IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] - def _find_opencast_video_entries_no_paging(self) -> list[IliasPageElement]: + def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: """ Crawls the "second stage" video page. This page contains the actual video urls. """ # Video start links are marked with an "Abspielen" link - video_links = cast( - list[Tag], self._soup.find_all(name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")) + video_links: List[Tag] = self._soup.findAll( + name="a", text=re.compile(r"\s*(Abspielen|Play)\s*") ) - results: list[IliasPageElement] = [] + results: List[IliasPageElement] = [] for link in video_links: results.append(self._listed_opencast_video_to_element(link)) @@ -824,10 +569,12 @@ class IliasPage: # 6th or 7th child (1 indexed) is the modification time string. Try to find it # by parsing backwards from the end and finding something that looks like a date modification_time = None - row: Tag = link.parent.parent.parent # type: ignore + row: Tag = link.parent.parent.parent column_count = len(row.select("td.std")) for index in range(column_count, 0, -1): - modification_string = cast(Tag, row.select_one(f"td.std:nth-child({index})")).get_text().strip() + modification_string = link.parent.parent.parent.select_one( + f"td.std:nth-child({index})" + ).getText().strip() if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string): modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M") break @@ -836,10 +583,10 @@ class IliasPage: log.warn(f"Could not determine upload time for {link}") modification_time = datetime.now() - title = cast(Tag, row.select_one("td.std:nth-child(3)")).get_text().strip() + title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() title += ".mp4" - video_name: str = sanitize_path_name(title) + video_name: str = _sanitize_path_name(title) video_url = self._abs_url_from_link(link) @@ -848,133 +595,114 @@ class IliasPage: IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time ) - def _find_exercise_entries(self) -> list[IliasPageElement]: + def _find_exercise_entries(self) -> List[IliasPageElement]: if self._soup.find(id="tab_submission"): - log.explain("Found submission tab. This is an exercise detail or files page") - if self._soup.select_one("#tab_submission.active") is None: - log.explain(" This is a details page") - return self._find_exercise_entries_detail_page() - else: - log.explain(" This is a files page") - return self._find_exercise_entries_files_page() - + log.explain("Found submission tab. This is an exercise detail page") + return self._find_exercise_entries_detail_page() log.explain("Found no submission tab. This is an exercise root page") return self._find_exercise_entries_root_page() - def _find_exercise_entries_detail_page(self) -> list[IliasPageElement]: - results: list[IliasPageElement] = [] + def _find_exercise_entries_detail_page(self) -> List[IliasPageElement]: + results: List[IliasPageElement] = [] - if link := self._soup.select_one("#tab_submission > a"): - results.append( - IliasPageElement.create_new( - IliasElementType.EXERCISE_FILES, self._abs_url_from_link(link), "Submission" - ) - ) - else: - log.explain("Found no submission link for exercise, maybe it has not started yet?") - - # Find all download links in the container (this will contain all the *feedback* files) - download_links = cast( - list[Tag], - self._soup.find_all( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x is not None and "cmd=download" in x}, - text="Download", - ), + # Find all download links in the container (this will contain all the files) + download_links: List[Tag] = self._soup.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmd=download" in x}, + text="Download" ) for link in download_links: - parent_row: Tag = cast( - Tag, link.find_parent(attrs={"class": lambda x: x is not None and "row" in x}) - ) - name_tag = parent_row.find(name="div") + parent_row: Tag = link.findParent("tr") + children: List[Tag] = parent_row.findChildren("td") - if not name_tag: - log.warn("Could not find name tag for exercise entry") - _unexpected_html_warning() - continue - - name = sanitize_path_name(name_tag.get_text().strip()) + name = _sanitize_path_name(children[1].getText().strip()) log.explain(f"Found exercise detail entry {name!r}") - results.append( - IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name) - ) - - return results - - def _find_exercise_entries_files_page(self) -> list[IliasPageElement]: - results: list[IliasPageElement] = [] - - # Find all download links in the container - download_links = cast( - list[Tag], - self._soup.find_all( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x is not None and "cmd=download" in x}, - text="Download", - ), - ) - - for link in download_links: - parent_row: Tag = cast(Tag, link.find_parent("tr")) - children = cast(list[Tag], parent_row.find_all("td")) - - name = sanitize_path_name(children[1].get_text().strip()) - log.explain(f"Found exercise file entry {name!r}") - - date = None for child in reversed(children): - date = demangle_date(child.get_text().strip(), fail_silently=True) + date = demangle_date(child.getText().strip(), fail_silently=True) if date is not None: break if date is None: - log.warn(f"Date parsing failed for exercise file entry {name!r}") + log.warn(f"Date parsing failed for exercise entry {name!r}") - results.append( - IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name, date) - ) + results.append(IliasPageElement.create_new( + IliasElementType.FILE, + self._abs_url_from_link(link), + name, + date + )) return results - def _find_exercise_entries_root_page(self) -> list[IliasPageElement]: - results: list[IliasPageElement] = [] + def _find_exercise_entries_root_page(self) -> List[IliasPageElement]: + results: List[IliasPageElement] = [] - content_tab = self._soup.find(id="ilContentContainer") - if not content_tab: - log.warn("Could not find content tab in exercise overview page") - _unexpected_html_warning() - return [] + # Each assignment is in an accordion container + assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") - exercise_links = content_tab.select(".il-item-title a") + for container in assignment_containers: + # Fetch the container name out of the header to use it in the path + container_name = container.select_one(".ilAssignmentHeader").getText().strip() + log.explain(f"Found exercise container {container_name!r}") - for exercise in cast(list[Tag], exercise_links): - if "href" not in exercise.attrs: - continue - href = exercise.attrs["href"] - if type(href) is not str: - continue - if "ass_id=" not in href or "cmdclass=ilassignmentpresentationgui" not in href.lower(): - continue + # Find all download links in the container (this will contain all the files) + files: List[Tag] = container.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, + text="Download" + ) - name = sanitize_path_name(exercise.get_text().strip()) - results.append( - IliasPageElement.create_new( - IliasElementType.EXERCISE, self._abs_url_from_link(exercise), name + # Grab each file as you now have the link + for file_link in files: + # Two divs, side by side. Left is the name, right is the link ==> get left + # sibling + file_name = file_link.parent.findPrevious(name="div").getText().strip() + url = self._abs_url_from_link(file_link) + + log.explain(f"Found exercise entry {file_name!r}") + results.append(IliasPageElement.create_new( + IliasElementType.FILE, + url, + _sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name), + mtime=None, # We do not have any timestamp + skip_sanitize=True + )) + + # Find all links to file listings (e.g. "Submitted Files" for groups) + file_listings: List[Tag] = container.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()} + ) + + # Add each listing as a new + for listing in file_listings: + parent_container: Tag = listing.findParent( + "div", attrs={"class": lambda x: x and "form-group" in x} ) - ) - - for result in results: - log.explain(f"Found exercise {result.name!r}") + label_container: Tag = parent_container.find( + attrs={"class": lambda x: x and "control-label" in x} + ) + file_name = label_container.getText().strip() + url = self._abs_url_from_link(listing) + log.explain(f"Found exercise detail {file_name!r} at {url}") + results.append(IliasPageElement.create_new( + IliasElementType.EXERCISE_FILES, + url, + _sanitize_path_name(container_name) + "/" + _sanitize_path_name(file_name), + None, # we do not have any timestamp + skip_sanitize=True + )) return results - def _find_normal_entries(self) -> list[IliasPageElement]: - result: list[IliasPageElement] = [] + def _find_normal_entries(self) -> List[IliasPageElement]: + result: List[IliasPageElement] = [] - links: list[Tag] = [] + links: List[Tag] = [] # Fetch all links and throw them to the general interpreter if self._is_course_overview_page(): log.explain("Page is a course overview page, adjusting link selector") @@ -985,17 +713,15 @@ class IliasPage: for link in links: abs_url = self._abs_url_from_link(link) # Make sure parents are sanitized. We do not want accidental parents - parents = [sanitize_path_name(x) for x in IliasPage._find_upwards_folder_hierarchy(link)] + parents = [_sanitize_path_name(x) for x in self._find_upwards_folder_hierarchy(link)] if parents: - element_name = "/".join(parents) + "/" + sanitize_path_name(link.get_text()) + element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) else: - element_name = sanitize_path_name(link.get_text()) + element_name = _sanitize_path_name(link.getText()) - element_type = IliasPage._find_type_for_element( - element_name, abs_url, lambda: IliasPage._find_icon_for_folder_entry(link) - ) - description = IliasPage._find_link_description(link) + element_type = self._find_type_from_link(element_name, link, abs_url) + description = self._find_link_description(link) # The last meeting on every page is expanded by default. # Its content is then shown inline *and* in the meeting page itself. @@ -1006,15 +732,17 @@ class IliasPage: if not element_type: continue elif element_type == IliasElementType.FILE: - result.append(IliasPage._file_to_element(element_name, abs_url, link)) + result.append(self._file_to_element(element_name, abs_url, link)) continue - log.explain(f"Found {element_name!r} of type {element_type}") - result.append( - IliasPageElement.create_new( - element_type, abs_url, element_name, description=description, skip_sanitize=True - ) - ) + log.explain(f"Found {element_name!r}") + result.append(IliasPageElement.create_new( + element_type, + abs_url, + element_name, + description=description, + skip_sanitize=True + )) result += self._find_cards() result += self._find_mediacast_videos() @@ -1022,96 +750,74 @@ class IliasPage: return result - def _find_mediacast_videos(self) -> list[IliasPageElement]: - videos: list[IliasPageElement] = [] + def _find_mediacast_videos(self) -> List[IliasPageElement]: + videos: List[IliasPageElement] = [] - regex = re.compile(r"il\.VideoPlaylist\.init.+?\[(.+?)], ") - for script in cast(list[Tag], self._soup.find_all("script")): - for match in regex.finditer(script.text): - try: - playlist = json.loads("[" + match.group(1) + "]") - except json.JSONDecodeError: - log.warn("Could not decode playlist json") - log.warn_contd(f"Playlist json: [{match.group(1)}]") - continue - for elem in playlist: - title = elem.get("title", None) - description = elem.get("description", None) - url = elem.get("resource", None) - if title is None or description is None or url is None: - log.explain(f"Mediacast json: {match.group(1)}") - log.warn("Mediacast video json was not complete") - if title is None: - log.warn_contd("Missing title") - if description is None: - log.warn_contd("Missing description") - if url is None: - log.warn_contd("Missing URL") + for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")): + element_name = _sanitize_path_name( + elem.select_one(".ilPlayerPreviewDescription").getText().strip() + ) + if not element_name.endswith(".mp4"): + # just to make sure it has some kinda-alrightish ending + element_name = element_name + ".mp4" + video_element = elem.find(name="video") + if not video_element: + _unexpected_html_warning() + log.warn_contd(f"No