diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..27246bf --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1 @@ +2cf0e060ed126537dd993896b6aa793e2a6b9e80 diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..3891848 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: github-actions + directory: / + schedule: + interval: monthly + groups: + gh-actions: + patterns: + - "*" diff --git a/.github/workflows/build-and-release.yml b/.github/workflows/build-and-release.yml index 83a36e4..9cd962f 100644 --- a/.github/workflows/build-and-release.yml +++ b/.github/workflows/build-and-release.yml @@ -1,6 +1,6 @@ name: build-and-release -on: push +on: [push, pull_request] defaults: run: @@ -13,28 +13,26 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - python: ["3.9"] + os: [ubuntu-latest, windows-latest, macos-13, macos-latest] + python: ["3.11"] steps: + - uses: actions/checkout@v4 - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v4 + - name: Install uv + uses: astral-sh/setup-uv@v7 with: python-version: ${{ matrix.python }} - name: Set up project - if: matrix.os != 'windows-latest' - run: ./scripts/setup - - - name: Set up project on windows - if: matrix.os == 'windows-latest' - # For some reason, `pip install --upgrade pip` doesn't work on - # 'windows-latest'. The installed pip version works fine however. - run: ./scripts/setup --no-pip + run: uv sync - name: Run checks - run: ./scripts/check + run: | + ./scripts/check + ./scripts/format + + - name: Assert no changes + run: git diff --exit-code - name: Build run: ./scripts/build @@ -45,9 +43,9 @@ jobs: run: mv dist/pferd* dist/pferd-${{ matrix.os }} - name: Upload binary - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: Binaries + name: pferd-${{ matrix.os }} path: dist/pferd-${{ matrix.os }} release: @@ -57,18 +55,20 @@ jobs: steps: - name: Download binaries - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 with: - name: Binaries + pattern: pferd-* + merge-multiple: true - name: Rename binaries run: | mv pferd-ubuntu-latest pferd-linux mv pferd-windows-latest pferd-windows.exe + mv pferd-macos-13 pferd-mac-x86_64 mv pferd-macos-latest pferd-mac - name: Create release - uses: softprops/action-gh-release@v1 + uses: softprops/action-gh-release@v2 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: @@ -76,3 +76,4 @@ jobs: pferd-linux pferd-windows.exe pferd-mac + pferd-mac-x86_64 diff --git a/CHANGELOG.md b/CHANGELOG.md index e404d1d..2a2848c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,95 @@ ambiguous situations. ## Unreleased +## Added +- Store the description when using the `internet-shortcut` link format +- Support for basic auth with the kit-ipd crawler + +## Fixed +- Event loop errors on Windows with Python 3.14 +- Sanitize `/` in headings in kit-ipd crawler +- Crawl info tab again + +## 3.8.3 - 2025-07-01 + +## Added +- Support for link collections. + In "fancy" mode, a single HTML file with multiple links is generated. + In all other modes, PFERD creates a folder for the collection and a new file + for every link inside. + +## Fixed +- Crawling of exercises with instructions +- Don't download unavailable elements. + Elements that are unavailable (for example, because their availability is + time restricted) will not download the HTML for the info page anymore. +- `base_url` argument for `ilias-web` crawler causing crashes + +## 3.8.2 - 2025-04-29 + +## Changed +- Explicitly mention that wikis are not supported at the moment and ignore them + +## Fixed +- Ilias-native login +- Exercise crawling + +## 3.8.1 - 2025-04-17 + +## Fixed +- Description html files now specify at UTF-8 encoding +- Images in descriptions now always have a white background + +## 3.8.0 - 2025-04-16 + +### Added +- Support for ILIAS 9 + +### Changed +- Added prettier CSS to forum threads +- Downloaded forum threads now link to the forum instead of the ILIAS thread +- Increase minimum supported Python version to 3.11 +- Do not crawl nested courses (courses linked in other courses) + +## Fixed +- File links in report on Windows +- TOTP authentication in KIT Shibboleth +- Forum crawling only considering the first 20 entries + +## 3.7.0 - 2024-11-13 + +### Added +- Support for MOB videos in page descriptions +- Clickable links in the report to directly open new/modified/not-deleted files +- Support for non KIT shibboleth login + +### Changed +- Remove videos from description pages +- Perform ILIAS cycle detection after processing the transform to allow + ignoring duplicated elements +- Parse headings (h1-h3) as folders in kit-ipd crawler + +### Fixed +- Personal desktop/dashboard/favorites crawling +- Crawling of nested courses +- Downloading of links with no target URL +- Handle row flex on description pages +- Add `` heading to forum threads to fix mime type detection +- Handle groups in cards + +## 3.6.0 - 2024-10-23 + +### Added +- Generic `ilias-web` crawler and `ilias-web` CLI command +- Support for the course overview page. Using this URL as a target might cause + duplication warnings, as subgroups are listed separately. +- Support for named capture groups in regex transforms +- Crawl custom item groups as folders + +### Fixed +- Normalization of meeting names in cards +- Sanitization of slashes in exercise container names + ## 3.5.2 - 2024-04-14 ### Fixed diff --git a/CONFIG.md b/CONFIG.md index 5f62749..b87f75c 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header, which is followed by a list of `key = value` pairs. Comments must be on their own line and start with `#`. Multiline values must be indented beyond their key. Boolean values can be `yes` or `no`. For more details and some examples on the -format, see the [configparser documentation][1] ([interpolation][2] is -disabled). +format, see the [configparser documentation][cp-file] +([interpolation][cp-interp] is disabled). -[1]: "Supported INI File Structure" -[2]: "Interpolation of values" +[cp-file]: "Supported INI File Structure" +[cp-interp]: "Interpolation of values" ## The `DEFAULT` section @@ -146,13 +146,73 @@ crawler simulate a slower, network-based crawler. This crawler crawls a KIT-IPD page by url. The root page can be crawled from outside the KIT network so you will be informed about any new/deleted files, -but downloading files requires you to be within. Adding a show delay between +but downloading files requires you to be within. Adding a short delay between requests is likely a good idea. - `target`: URL to a KIT-IPD page - `link_regex`: A regex that is matched against the `href` part of links. If it matches, the given link is downloaded as a file. This is used to extract files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`) +- `auth`: Name of auth section to use for basic authentication. (Optional) + +### The `ilias-web` crawler + +This crawler crawls a generic ILIAS instance. + +Inspired by [this ILIAS downloader][ilias-dl], the following configurations should work +out of the box for the corresponding universities: + +[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs" + +| University | `base_url` | `login_type` | `client_id` | +|-----------------|-----------------------------------------|--------------|---------------| +| FH Aachen | https://www.ili.fh-aachen.de | local | elearning | +| HHU Düsseldorf | https://ilias.hhu.de | local | UniRZ | +| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk | +| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ | +| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart | +| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | | +| KIT ILIAS Pilot | https://pilot.ilias.studium.kit.edu | shibboleth | pilot | + +If your university isn't listed, try navigating to your instance's login page. +Assuming no custom login service is used, the URL will look something like this: + +```jinja +{{ base_url }}/login.php?client_id={{ client_id }}&cmd=force_login&lang= +``` + +If the values work, feel free to submit a PR and add them to the table above. + +- `base_url`: The URL where the ILIAS instance is located. (Required) +- `login_type`: How you authenticate. (Required) + - `local`: Use `client_id` for authentication. + - `shibboleth`: Use shibboleth for authentication. +- `client_id`: An ID used for authentication if `login_type` is `local`. Is + ignored if `login_type` is `shibboleth`. +- `target`: The ILIAS element to crawl. (Required) + - `desktop`: Crawl your personal desktop / dashboard + - ``: Crawl the course with the given id + - ``: Crawl a given element by URL (preferably the permanent URL linked + at the bottom of its ILIAS page). + This also supports the "My Courses" overview page to download *all* + courses. Note that this might produce confusing local directory layouts + and duplication warnings if you are a member of an ILIAS group. The + `desktop` target is generally preferable. +- `auth`: Name of auth section to use for login. (Required) +- `tfa_auth`: Name of auth section to use for two-factor authentication. Only + uses the auth section's password. (Default: Anonymous `tfa` authenticator) +- `links`: How to represent external links. (Default: `fancy`) + - `ignore`: Don't download links. + - `plaintext`: A text file containing only the URL. + - `fancy`: A HTML file looking like the ILIAS link element. + - `internet-shortcut`: An internet shortcut file (`.url` file). +- `link_redirect_delay`: Time (in seconds) until `fancy` link files will + redirect to the actual URL. Set to a negative value to disable the automatic + redirect. (Default: `-1`) +- `videos`: Whether to download videos. (Default: `no`) +- `forums`: Whether to download forum threads. (Default: `no`) +- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default: + `20.0`) ### The `kit-ilias-web` crawler @@ -232,10 +292,10 @@ is stored in the keyring. ### The `pass` authenticator -This authenticator queries the [`pass` password manager][3] for a username and -password. It tries to be mostly compatible with [browserpass][4] and -[passff][5], so see those links for an overview of the format. If PFERD fails -to load your password, you can use the `--explain` flag to see why. +This authenticator queries the [`pass` password manager][pass] for a username +and password. It tries to be mostly compatible with [browserpass][browserpass] +and [passff][passff], so see those links for an overview of the format. If PFERD +fails to load your password, you can use the `--explain` flag to see why. - `passname`: The name of the password to use (Required) - `username_prefixes`: A comma-separated list of username line prefixes @@ -243,9 +303,9 @@ to load your password, you can use the `--explain` flag to see why. - `password_prefixes`: A comma-separated list of password line prefixes (Default: `password,pass,secret`) -[3]: "Pass: The Standard Unix Password Manager" -[4]: "Organizing password store" -[5]: "Multi-line format" +[pass]: "Pass: The Standard Unix Password Manager" +[browserpass]: "Organizing password store" +[passff]: "Multi-line format" ### The `tfa` authenticator @@ -344,7 +404,8 @@ matches `SOURCE`, the output path is created using `TARGET` as template. be referred to as `{g}` (e.g. `{g3}`). `{g0}` refers to the original path. If capturing group *n*'s contents are a valid integer, the integer value is available as `{i}` (e.g. `{i3}`). If capturing group *n*'s contents are a -valid float, the float value is available as `{f}` (e.g. `{f3}`). If a +valid float, the float value is available as `{f}` (e.g. `{f3}`). Named capture +groups (e.g. `(?P)`) are available by their name (e.g. `{name}`). If a capturing group is not present (e.g. when matching the string `cd` with the regex `(ab)?cd`), the corresponding variables are not defined. diff --git a/DEV.md b/DEV.md index f577b93..8cc42c2 100644 --- a/DEV.md +++ b/DEV.md @@ -9,30 +9,25 @@ particular [this][ppug-1] and [this][ppug-2] guide). ## Setting up a dev environment -The use of [venv][venv] is recommended. To initially set up a development -environment, run these commands in the same directory as this file: +The use of [venv][venv] and [uv][uv] is recommended. To initially set up a +development environment, run these commands in the same directory as this file: ``` -$ python -m venv .venv +$ uv sync $ . .venv/bin/activate -$ ./scripts/setup ``` -The setup script installs a few required dependencies and tools. It also -installs PFERD via `pip install --editable .`, which means that you can just run -`pferd` as if it was installed normally. Since PFERD was installed with -`--editable`, there is no need to re-run `pip install` when the source code is -changed. - -If you get any errors because pip can't update itself, try running -`./scripts/setup --no-pip` instead of `./scripts/setup`. +This install all required dependencies and tools. It also installs PFERD as +*editable*, which means that you can just run `pferd` as if it was installed +normally. Since PFERD was installed with `--editable`, there is no need to +re-run `uv sync` when the source code is changed. For more details, see [this part of the Python Tutorial][venv-tut] and [this section on "development mode"][ppug-dev]. [venv]: "venv - Creation of virtual environments" [venv-tut]: "12. Virtual Environments and Packages" -[ppug-dev]: "Working in “development mode”" +[uv]: "uv - An extremely fast Python package and project manager" ## Checking and formatting the code diff --git a/LICENSE b/LICENSE index d81e827..ccccbe3 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, +Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim, thelukasprobst, Toorero, - Mr-Pine + Mr-Pine, p-fruck, PinieP Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/PFERD/__main__.py b/PFERD/__main__.py index cb8c67c..2de9dbc 100644 --- a/PFERD/__main__.py +++ b/PFERD/__main__.py @@ -133,7 +133,8 @@ def main() -> None: # https://bugs.python.org/issue39232 # https://github.com/encode/httpx/issues/914#issuecomment-780023632 # TODO Fix this properly - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) loop.run_until_complete(pferd.run(args.debug_transforms)) loop.run_until_complete(asyncio.sleep(1)) loop.close() diff --git a/PFERD/auth/__init__.py b/PFERD/auth/__init__.py index aa3ba8e..7295c7a 100644 --- a/PFERD/auth/__init__.py +++ b/PFERD/auth/__init__.py @@ -1,5 +1,5 @@ +from collections.abc import Callable from configparser import SectionProxy -from typing import Callable, Dict from ..config import Config from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401 @@ -9,21 +9,19 @@ from .pass_ import PassAuthenticator, PassAuthSection from .simple import SimpleAuthenticator, SimpleAuthSection from .tfa import TfaAuthenticator -AuthConstructor = Callable[[ - str, # Name (without the "auth:" prefix) - SectionProxy, # Authenticator's section of global config - Config, # Global config -], Authenticator] +AuthConstructor = Callable[ + [ + str, # Name (without the "auth:" prefix) + SectionProxy, # Authenticator's section of global config + Config, # Global config + ], + Authenticator, +] -AUTHENTICATORS: Dict[str, AuthConstructor] = { - "credential-file": lambda n, s, c: - CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), - "keyring": lambda n, s, c: - KeyringAuthenticator(n, KeyringAuthSection(s)), - "pass": lambda n, s, c: - PassAuthenticator(n, PassAuthSection(s)), - "simple": lambda n, s, c: - SimpleAuthenticator(n, SimpleAuthSection(s)), - "tfa": lambda n, s, c: - TfaAuthenticator(n), +AUTHENTICATORS: dict[str, AuthConstructor] = { + "credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), + "keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)), + "pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)), + "simple": lambda n, s, c: SimpleAuthenticator(n, SimpleAuthSection(s)), + "tfa": lambda n, s, c: TfaAuthenticator(n), } diff --git a/PFERD/auth/authenticator.py b/PFERD/auth/authenticator.py index 643a2d5..417b7ba 100644 --- a/PFERD/auth/authenticator.py +++ b/PFERD/auth/authenticator.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from typing import Tuple from ..config import Section @@ -35,7 +34,7 @@ class Authenticator(ABC): self.name = name @abstractmethod - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: pass async def username(self) -> str: diff --git a/PFERD/auth/credential_file.py b/PFERD/auth/credential_file.py index 94ffa73..cb7834c 100644 --- a/PFERD/auth/credential_file.py +++ b/PFERD/auth/credential_file.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import Tuple from ..config import Config from ..utils import fmt_real_path @@ -23,7 +22,9 @@ class CredentialFileAuthenticator(Authenticator): with open(path, encoding="utf-8") as f: lines = list(f) except UnicodeDecodeError: - raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8") + raise AuthLoadError( + f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8" + ) from None except OSError as e: raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e @@ -42,5 +43,5 @@ class CredentialFileAuthenticator(Authenticator): self._username = uline[9:] self._password = pline[9:] - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: return self._username, self._password diff --git a/PFERD/auth/keyring.py b/PFERD/auth/keyring.py index c14f6fb..414640a 100644 --- a/PFERD/auth/keyring.py +++ b/PFERD/auth/keyring.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Optional import keyring @@ -17,7 +17,6 @@ class KeyringAuthSection(AuthSection): class KeyringAuthenticator(Authenticator): - def __init__(self, name: str, section: KeyringAuthSection) -> None: super().__init__(name) @@ -28,7 +27,7 @@ class KeyringAuthenticator(Authenticator): self._password_invalidated = False self._username_fixed = section.username() is not None - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: # Request the username if self._username is None: async with log.exclusive_output(): diff --git a/PFERD/auth/pass_.py b/PFERD/auth/pass_.py index 4c8e775..c5d9b24 100644 --- a/PFERD/auth/pass_.py +++ b/PFERD/auth/pass_.py @@ -1,6 +1,5 @@ import re import subprocess -from typing import List, Tuple from ..logging import log from .authenticator import Authenticator, AuthError, AuthSection @@ -12,11 +11,11 @@ class PassAuthSection(AuthSection): self.missing_value("passname") return value - def username_prefixes(self) -> List[str]: + def username_prefixes(self) -> list[str]: value = self.s.get("username_prefixes", "login,username,user") return [prefix.lower() for prefix in value.split(",")] - def password_prefixes(self) -> List[str]: + def password_prefixes(self) -> list[str]: value = self.s.get("password_prefixes", "password,pass,secret") return [prefix.lower() for prefix in value.split(",")] @@ -31,14 +30,14 @@ class PassAuthenticator(Authenticator): self._username_prefixes = section.username_prefixes() self._password_prefixes = section.password_prefixes() - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: log.explain_topic("Obtaining credentials from pass") try: log.explain(f"Calling 'pass show {self._passname}'") result = subprocess.check_output(["pass", "show", self._passname], text=True) except subprocess.CalledProcessError as e: - raise AuthError(f"Failed to get password info from {self._passname}: {e}") + raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e prefixed = {} unprefixed = [] diff --git a/PFERD/auth/simple.py b/PFERD/auth/simple.py index 831c12f..dea4b67 100644 --- a/PFERD/auth/simple.py +++ b/PFERD/auth/simple.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple +from typing import Optional from ..logging import log from ..utils import agetpass, ainput @@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator): self._username_fixed = self.username is not None self._password_fixed = self.password is not None - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: if self._username is not None and self._password is not None: return self._username, self._password diff --git a/PFERD/auth/tfa.py b/PFERD/auth/tfa.py index 26b1383..6ae48fe 100644 --- a/PFERD/auth/tfa.py +++ b/PFERD/auth/tfa.py @@ -1,5 +1,3 @@ -from typing import Tuple - from ..logging import log from ..utils import ainput from .authenticator import Authenticator, AuthError @@ -17,7 +15,7 @@ class TfaAuthenticator(Authenticator): code = await ainput("TFA code: ") return code - async def credentials(self) -> Tuple[str, str]: + async def credentials(self) -> tuple[str, str]: raise AuthError("TFA authenticator does not support usernames") def invalidate_username(self) -> None: diff --git a/PFERD/cli/__init__.py b/PFERD/cli/__init__.py index efa8f00..c89f6f4 100644 --- a/PFERD/cli/__init__.py +++ b/PFERD/cli/__init__.py @@ -8,6 +8,7 @@ # well. from . import command_local # noqa: F401 imported but unused +from . import command_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused from . import command_kit_ipd # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused diff --git a/PFERD/cli/command_ilias_web.py b/PFERD/cli/command_ilias_web.py new file mode 100644 index 0000000..b68e48f --- /dev/null +++ b/PFERD/cli/command_ilias_web.py @@ -0,0 +1,53 @@ +import argparse +import configparser + +from ..logging import log +from .common_ilias_args import configure_common_group_args, load_common +from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler + +COMMAND_NAME = "ilias-web" + +SUBPARSER = SUBPARSERS.add_parser( + COMMAND_NAME, + parents=[CRAWLER_PARSER], +) + +GROUP = SUBPARSER.add_argument_group( + title=f"{COMMAND_NAME} crawler arguments", + description=f"arguments for the '{COMMAND_NAME}' crawler", +) + +GROUP.add_argument( + "--base-url", + type=str, + metavar="BASE_URL", + help="The base url of the ilias instance", +) + +GROUP.add_argument( + "--client-id", + type=str, + metavar="CLIENT_ID", + help="The client id of the ilias instance", +) + +configure_common_group_args(GROUP) + + +def load(args: argparse.Namespace, parser: configparser.ConfigParser) -> None: + log.explain(f"Creating config for command '{COMMAND_NAME}'") + + parser["crawl:ilias"] = {} + section = parser["crawl:ilias"] + load_crawler(args, section) + + section["type"] = COMMAND_NAME + if args.base_url is not None: + section["base_url"] = args.base_url + if args.client_id is not None: + section["client_id"] = args.client_id + + load_common(section, args, parser) + + +SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_kit_ilias_web.py b/PFERD/cli/command_kit_ilias_web.py index de74fc3..b3b45c5 100644 --- a/PFERD/cli/command_kit_ilias_web.py +++ b/PFERD/cli/command_kit_ilias_web.py @@ -1,120 +1,37 @@ import argparse import configparser -from pathlib import Path -from ..crawl.ilias.file_templates import Links from ..logging import log -from .parser import (CRAWLER_PARSER, SUBPARSERS, BooleanOptionalAction, ParserLoadError, load_crawler, - show_value_error) +from .common_ilias_args import configure_common_group_args, load_common +from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler + +COMMAND_NAME = "kit-ilias-web" SUBPARSER = SUBPARSERS.add_parser( - "kit-ilias-web", + COMMAND_NAME, parents=[CRAWLER_PARSER], ) GROUP = SUBPARSER.add_argument_group( - title="kit-ilias-web crawler arguments", - description="arguments for the 'kit-ilias-web' crawler", -) -GROUP.add_argument( - "target", - type=str, - metavar="TARGET", - help="course id, 'desktop', or ILIAS URL to crawl" -) -GROUP.add_argument( - "output", - type=Path, - metavar="OUTPUT", - help="output directory" -) -GROUP.add_argument( - "--username", "-u", - type=str, - metavar="USERNAME", - help="user name for authentication" -) -GROUP.add_argument( - "--keyring", - action=BooleanOptionalAction, - help="use the system keyring to store and retrieve passwords" -) -GROUP.add_argument( - "--credential-file", - type=Path, - metavar="PATH", - help="read username and password from a credential file" -) -GROUP.add_argument( - "--links", - type=show_value_error(Links.from_string), - metavar="OPTION", - help="how to represent external links" -) -GROUP.add_argument( - "--link-redirect-delay", - type=int, - metavar="SECONDS", - help="time before 'fancy' links redirect to to their target (-1 to disable)" -) -GROUP.add_argument( - "--videos", - action=BooleanOptionalAction, - help="crawl and download videos" -) -GROUP.add_argument( - "--forums", - action=BooleanOptionalAction, - help="crawl and download forum posts" -) -GROUP.add_argument( - "--http-timeout", "-t", - type=float, - metavar="SECONDS", - help="timeout for all HTTP requests" + title=f"{COMMAND_NAME} crawler arguments", + description=f"arguments for the '{COMMAND_NAME}' crawler", ) +configure_common_group_args(GROUP) + def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: - log.explain("Creating config for command 'kit-ilias-web'") + log.explain(f"Creating config for command '{COMMAND_NAME}'") parser["crawl:ilias"] = {} section = parser["crawl:ilias"] load_crawler(args, section) - section["type"] = "kit-ilias-web" - section["target"] = str(args.target) - section["output_dir"] = str(args.output) - section["auth"] = "auth:ilias" - if args.links is not None: - section["links"] = str(args.links.value) - if args.link_redirect_delay is not None: - section["link_redirect_delay"] = str(args.link_redirect_delay) - if args.videos is not None: - section["videos"] = "yes" if args.videos else "no" - if args.forums is not None: - section["forums"] = "yes" if args.forums else "no" - if args.http_timeout is not None: - section["http_timeout"] = str(args.http_timeout) - - parser["auth:ilias"] = {} - auth_section = parser["auth:ilias"] - if args.credential_file is not None: - if args.username is not None: - raise ParserLoadError("--credential-file and --username can't be used together") - if args.keyring: - raise ParserLoadError("--credential-file and --keyring can't be used together") - auth_section["type"] = "credential-file" - auth_section["path"] = str(args.credential_file) - elif args.keyring: - auth_section["type"] = "keyring" - else: - auth_section["type"] = "simple" - if args.username is not None: - auth_section["username"] = args.username + section["type"] = COMMAND_NAME + load_common(section, args, parser) SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_kit_ipd.py b/PFERD/cli/command_kit_ipd.py index b53e67e..a80af03 100644 --- a/PFERD/cli/command_kit_ipd.py +++ b/PFERD/cli/command_kit_ipd.py @@ -18,25 +18,30 @@ GROUP.add_argument( "--link-regex", type=str, metavar="REGEX", - help="href-matching regex to identify downloadable files" + help="href-matching regex to identify downloadable files", +) +GROUP.add_argument( + "--basic-auth", + action="store_true", + help="enable basic authentication", ) GROUP.add_argument( "target", type=str, metavar="TARGET", - help="url to crawl" + help="url to crawl", ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory" + help="output directory", ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'kit-ipd'") @@ -50,5 +55,11 @@ def load( if args.link_regex: section["link_regex"] = str(args.link_regex) + if args.basic_auth: + section["auth"] = "auth:kit-ipd" + parser["auth:kit-ipd"] = {} + auth_section = parser["auth:kit-ipd"] + auth_section["type"] = "simple" + SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_local.py b/PFERD/cli/command_local.py index 309c42f..6016afa 100644 --- a/PFERD/cli/command_local.py +++ b/PFERD/cli/command_local.py @@ -18,37 +18,37 @@ GROUP.add_argument( "target", type=Path, metavar="TARGET", - help="directory to crawl" + help="directory to crawl", ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory" + help="output directory", ) GROUP.add_argument( "--crawl-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for crawl requests" + help="artificial delay to simulate for crawl requests", ) GROUP.add_argument( "--download-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for download requests" + help="artificial delay to simulate for download requests", ) GROUP.add_argument( "--download-speed", type=int, metavar="BYTES_PER_SECOND", - help="download speed to simulate" + help="download speed to simulate", ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'local'") diff --git a/PFERD/cli/common_ilias_args.py b/PFERD/cli/common_ilias_args.py new file mode 100644 index 0000000..edad6da --- /dev/null +++ b/PFERD/cli/common_ilias_args.py @@ -0,0 +1,106 @@ +import argparse +import configparser +from pathlib import Path + +from ..crawl.ilias.file_templates import Links +from .parser import BooleanOptionalAction, ParserLoadError, show_value_error + + +def configure_common_group_args(group: argparse._ArgumentGroup) -> None: + """These arguments are shared between the KIT and generic Ilias web command.""" + group.add_argument( + "target", + type=str, + metavar="TARGET", + help="course id, 'desktop', or ILIAS URL to crawl", + ) + group.add_argument( + "output", + type=Path, + metavar="OUTPUT", + help="output directory", + ) + group.add_argument( + "--username", + "-u", + type=str, + metavar="USERNAME", + help="user name for authentication", + ) + group.add_argument( + "--keyring", + action=BooleanOptionalAction, + help="use the system keyring to store and retrieve passwords", + ) + group.add_argument( + "--credential-file", + type=Path, + metavar="PATH", + help="read username and password from a credential file", + ) + group.add_argument( + "--links", + type=show_value_error(Links.from_string), + metavar="OPTION", + help="how to represent external links", + ) + group.add_argument( + "--link-redirect-delay", + type=int, + metavar="SECONDS", + help="time before 'fancy' links redirect to to their target (-1 to disable)", + ) + group.add_argument( + "--videos", + action=BooleanOptionalAction, + help="crawl and download videos", + ) + group.add_argument( + "--forums", + action=BooleanOptionalAction, + help="crawl and download forum posts", + ) + group.add_argument( + "--http-timeout", + "-t", + type=float, + metavar="SECONDS", + help="timeout for all HTTP requests", + ) + + +def load_common( + section: configparser.SectionProxy, + args: argparse.Namespace, + parser: configparser.ConfigParser, +) -> None: + """Load common config between generic and KIT ilias web command""" + section["target"] = str(args.target) + section["output_dir"] = str(args.output) + section["auth"] = "auth:ilias" + if args.links is not None: + section["links"] = str(args.links.value) + if args.link_redirect_delay is not None: + section["link_redirect_delay"] = str(args.link_redirect_delay) + if args.videos is not None: + section["videos"] = "yes" if args.videos else "no" + if args.forums is not None: + section["forums"] = "yes" if args.forums else "no" + if args.http_timeout is not None: + section["http_timeout"] = str(args.http_timeout) + + parser["auth:ilias"] = {} + auth_section = parser["auth:ilias"] + if args.credential_file is not None: + if args.username is not None: + raise ParserLoadError("--credential-file and --username can't be used together") + if args.keyring: + raise ParserLoadError("--credential-file and --keyring can't be used together") + auth_section["type"] = "credential-file" + auth_section["path"] = str(args.credential_file) + elif args.keyring: + auth_section["type"] = "keyring" + else: + auth_section["type"] = "simple" + if args.username is not None: + auth_section["username"] = args.username diff --git a/PFERD/cli/parser.py b/PFERD/cli/parser.py index be483fd..c9bec13 100644 --- a/PFERD/cli/parser.py +++ b/PFERD/cli/parser.py @@ -1,8 +1,9 @@ import argparse import configparser from argparse import ArgumentTypeError +from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Callable, List, Optional, Sequence, Union +from typing import Any, Optional from ..output_dir import OnConflict, Redownload from ..version import NAME, VERSION @@ -15,15 +16,15 @@ class ParserLoadError(Exception): # TODO Replace with argparse version when updating to 3.9? class BooleanOptionalAction(argparse.Action): def __init__( - self, - option_strings: List[str], - dest: Any, - default: Any = None, - type: Any = None, - choices: Any = None, - required: Any = False, - help: Any = None, - metavar: Any = None, + self, + option_strings: list[str], + dest: Any, + default: Any = None, + type: Any = None, + choices: Any = None, + required: Any = False, + help: Any = None, + metavar: Any = None, ): if len(option_strings) != 1: raise ValueError("There must be exactly one option string") @@ -48,11 +49,11 @@ class BooleanOptionalAction(argparse.Action): ) def __call__( - self, - parser: argparse.ArgumentParser, - namespace: argparse.Namespace, - values: Union[str, Sequence[Any], None], - option_string: Optional[str] = None, + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: str | Sequence[Any] | None, + option_string: Optional[str] = None, ) -> None: if option_string and option_string in self.option_strings: value = not option_string.startswith("--no-") @@ -67,11 +68,13 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]: Some validation functions (like the from_string in our enums) raise a ValueError. Argparse only pretty-prints ArgumentTypeErrors though, so we need to wrap our ValueErrors. """ + def wrapper(input: str) -> Any: try: return inner(input) except ValueError as e: - raise ArgumentTypeError(e) + raise ArgumentTypeError(e) from e + return wrapper @@ -81,52 +84,57 @@ CRAWLER_PARSER_GROUP = CRAWLER_PARSER.add_argument_group( description="arguments common to all crawlers", ) CRAWLER_PARSER_GROUP.add_argument( - "--redownload", "-r", + "--redownload", + "-r", type=show_value_error(Redownload.from_string), metavar="OPTION", - help="when to download a file that's already present locally" + help="when to download a file that's already present locally", ) CRAWLER_PARSER_GROUP.add_argument( "--on-conflict", type=show_value_error(OnConflict.from_string), metavar="OPTION", - help="what to do when local and remote files or directories differ" + help="what to do when local and remote files or directories differ", ) CRAWLER_PARSER_GROUP.add_argument( - "--transform", "-T", + "--transform", + "-T", action="append", type=str, metavar="RULE", - help="add a single transformation rule. Can be specified multiple times" + help="add a single transformation rule. Can be specified multiple times", ) CRAWLER_PARSER_GROUP.add_argument( - "--tasks", "-n", + "--tasks", + "-n", type=int, metavar="N", - help="maximum number of concurrent tasks (crawling, downloading)" + help="maximum number of concurrent tasks (crawling, downloading)", ) CRAWLER_PARSER_GROUP.add_argument( - "--downloads", "-N", + "--downloads", + "-N", type=int, metavar="N", - help="maximum number of tasks that may download data at the same time" + help="maximum number of tasks that may download data at the same time", ) CRAWLER_PARSER_GROUP.add_argument( - "--task-delay", "-d", + "--task-delay", + "-d", type=float, metavar="SECONDS", - help="time the crawler should wait between subsequent tasks" + help="time the crawler should wait between subsequent tasks", ) CRAWLER_PARSER_GROUP.add_argument( "--windows-paths", action=BooleanOptionalAction, - help="whether to repair invalid paths on windows" + help="whether to repair invalid paths on windows", ) def load_crawler( - args: argparse.Namespace, - section: configparser.SectionProxy, + args: argparse.Namespace, + section: configparser.SectionProxy, ) -> None: if args.redownload is not None: section["redownload"] = args.redownload.value @@ -152,79 +160,79 @@ PARSER.add_argument( version=f"{NAME} {VERSION} (https://github.com/Garmelon/PFERD)", ) PARSER.add_argument( - "--config", "-c", + "--config", + "-c", type=Path, metavar="PATH", - help="custom config file" + help="custom config file", ) PARSER.add_argument( "--dump-config", action="store_true", - help="dump current configuration to the default config path and exit" + help="dump current configuration to the default config path and exit", ) PARSER.add_argument( "--dump-config-to", metavar="PATH", - help="dump current configuration to a file and exit." - " Use '-' as path to print to stdout instead" + help="dump current configuration to a file and exit. Use '-' as path to print to stdout instead", ) PARSER.add_argument( "--debug-transforms", action="store_true", - help="apply transform rules to files of previous run" + help="apply transform rules to files of previous run", ) PARSER.add_argument( - "--crawler", "-C", + "--crawler", + "-C", action="append", type=str, metavar="NAME", - help="only execute a single crawler." - " Can be specified multiple times to execute multiple crawlers" + help="only execute a single crawler. Can be specified multiple times to execute multiple crawlers", ) PARSER.add_argument( - "--skip", "-S", + "--skip", + "-S", action="append", type=str, metavar="NAME", - help="don't execute this particular crawler." - " Can be specified multiple times to skip multiple crawlers" + help="don't execute this particular crawler. Can be specified multiple times to skip multiple crawlers", ) PARSER.add_argument( "--working-dir", type=Path, metavar="PATH", - help="custom working directory" + help="custom working directory", ) PARSER.add_argument( "--explain", action=BooleanOptionalAction, - help="log and explain in detail what PFERD is doing" + help="log and explain in detail what PFERD is doing", ) PARSER.add_argument( "--status", action=BooleanOptionalAction, - help="print status updates while PFERD is crawling" + help="print status updates while PFERD is crawling", ) PARSER.add_argument( "--report", action=BooleanOptionalAction, - help="print a report of all local changes before exiting" + help="print a report of all local changes before exiting", ) PARSER.add_argument( "--share-cookies", action=BooleanOptionalAction, - help="whether crawlers should share cookies where applicable" + help="whether crawlers should share cookies where applicable", ) PARSER.add_argument( "--show-not-deleted", action=BooleanOptionalAction, - help="print messages in status and report when PFERD did not delete a local only file" + help="print messages in status and report when PFERD did not delete a local only file", ) def load_default_section( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: section = parser[parser.default_section] diff --git a/PFERD/config.py b/PFERD/config.py index b2cff4e..7da2889 100644 --- a/PFERD/config.py +++ b/PFERD/config.py @@ -3,7 +3,7 @@ import os import sys from configparser import ConfigParser, SectionProxy from pathlib import Path -from typing import Any, List, NoReturn, Optional, Tuple +from typing import Any, NoReturn, Optional from rich.markup import escape @@ -53,10 +53,10 @@ class Section: raise ConfigOptionError(self.s.name, key, desc) def invalid_value( - self, - key: str, - value: Any, - reason: Optional[str], + self, + key: str, + value: Any, + reason: Optional[str], ) -> NoReturn: if reason is None: self.error(key, f"Invalid value {value!r}") @@ -126,13 +126,13 @@ class Config: with open(path, encoding="utf-8") as f: parser.read_file(f, source=str(path)) except FileNotFoundError: - raise ConfigLoadError(path, "File does not exist") + raise ConfigLoadError(path, "File does not exist") from None except IsADirectoryError: - raise ConfigLoadError(path, "That's a directory, not a file") + raise ConfigLoadError(path, "That's a directory, not a file") from None except PermissionError: - raise ConfigLoadError(path, "Insufficient permissions") + raise ConfigLoadError(path, "Insufficient permissions") from None except UnicodeDecodeError: - raise ConfigLoadError(path, "File is not encoded using UTF-8") + raise ConfigLoadError(path, "File is not encoded using UTF-8") from None def dump(self, path: Optional[Path] = None) -> None: """ @@ -150,8 +150,8 @@ class Config: try: path.parent.mkdir(parents=True, exist_ok=True) - except PermissionError: - raise ConfigDumpError(path, "Could not create parent directory") + except PermissionError as e: + raise ConfigDumpError(path, "Could not create parent directory") from e try: # Ensuring we don't accidentally overwrite any existing files by @@ -167,16 +167,16 @@ class Config: with open(path, "w", encoding="utf-8") as f: self._parser.write(f) else: - raise ConfigDumpError(path, "File already exists") + raise ConfigDumpError(path, "File already exists") from None except IsADirectoryError: - raise ConfigDumpError(path, "That's a directory, not a file") - except PermissionError: - raise ConfigDumpError(path, "Insufficient permissions") + raise ConfigDumpError(path, "That's a directory, not a file") from None + except PermissionError as e: + raise ConfigDumpError(path, "Insufficient permissions") from e def dump_to_stdout(self) -> None: self._parser.write(sys.stdout) - def crawl_sections(self) -> List[Tuple[str, SectionProxy]]: + def crawl_sections(self) -> list[tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("crawl:"): @@ -184,7 +184,7 @@ class Config: return result - def auth_sections(self) -> List[Tuple[str, SectionProxy]]: + def auth_sections(self) -> list[tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("auth:"): diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 1f8bd59..9ba6a37 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -1,25 +1,26 @@ +from collections.abc import Callable from configparser import SectionProxy -from typing import Callable, Dict from ..auth import Authenticator from ..config import Config from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 -from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection +from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection -CrawlerConstructor = Callable[[ - str, # Name (without the "crawl:" prefix) - SectionProxy, # Crawler's section of global config - Config, # Global config - Dict[str, Authenticator], # Loaded authenticators by name -], Crawler] +CrawlerConstructor = Callable[ + [ + str, # Name (without the "crawl:" prefix) + SectionProxy, # Crawler's section of global config + Config, # Global config + dict[str, Authenticator], # Loaded authenticators by name + ], + Crawler, +] -CRAWLERS: Dict[str, CrawlerConstructor] = { - "local": lambda n, s, c, a: - LocalCrawler(n, LocalCrawlerSection(s), c), - "kit-ilias-web": lambda n, s, c, a: - KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), - "kit-ipd": lambda n, s, c, a: - KitIpdCrawler(n, KitIpdCrawlerSection(s), c), +CRAWLERS: dict[str, CrawlerConstructor] = { + "local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c), + "ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), + "kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), + "kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c, a), } diff --git a/PFERD/crawl/crawler.py b/PFERD/crawl/crawler.py index 0e67c02..e2cdf30 100644 --- a/PFERD/crawl/crawler.py +++ b/PFERD/crawl/crawler.py @@ -1,10 +1,10 @@ import asyncio import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Coroutine +from collections.abc import Awaitable, Callable, Coroutine, Sequence from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar +from typing import Any, Optional, TypeVar from ..auth import Authenticator from ..config import Config, Section @@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]): return bar -class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): +class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): super().__init__() @@ -128,12 +128,13 @@ class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): def path(self) -> PurePath: return self._path - async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: + async def _on_aenter(self) -> tuple[ProgressBar, FileSink]: await self._stack.enter_async_context(self._limiter.limit_download()) sink = await self._stack.enter_async_context(self._fs_token) # The "Downloaded ..." message is printed in the output dir, not here - bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading", - fmt_path(self._path))) + bar = self._stack.enter_context( + log.download_bar("[bold bright_cyan]", "Downloading", fmt_path(self._path)) + ) return bar, sink @@ -149,9 +150,7 @@ class CrawlerSection(Section): return self.s.getboolean("skip", fallback=False) def output_dir(self, name: str) -> Path: - # TODO Use removeprefix() after switching to 3.9 - if name.startswith("crawl:"): - name = name[len("crawl:"):] + name = name.removeprefix("crawl:") return Path(self.s.get("output_dir", name)).expanduser() def redownload(self) -> Redownload: @@ -206,7 +205,7 @@ class CrawlerSection(Section): on_windows = os.name == "nt" return self.s.getboolean("windows_paths", fallback=on_windows) - def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: + def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator: value = self.s.get("auth") if value is None: self.missing_value("auth") @@ -218,10 +217,10 @@ class CrawlerSection(Section): class Crawler(ABC): def __init__( - self, - name: str, - section: CrawlerSection, - config: Config, + self, + name: str, + section: CrawlerSection, + config: Config, ) -> None: """ Initialize a crawler from its name and its section in the config file. @@ -258,8 +257,12 @@ class Crawler(ABC): def prev_report(self) -> Optional[Report]: return self._output_dir.prev_report + @property + def output_dir(self) -> OutputDirectory: + return self._output_dir + @staticmethod - async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: + async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]: """ Similar to asyncio.gather. However, in the case of an exception, all still running tasks are cancelled and the exception is rethrown. @@ -290,12 +293,39 @@ class Crawler(ABC): log.explain("Answer: Yes") return CrawlToken(self._limiter, path) + def should_try_download( + self, + path: PurePath, + *, + etag_differs: Optional[bool] = None, + mtime: Optional[datetime] = None, + redownload: Optional[Redownload] = None, + on_conflict: Optional[OnConflict] = None, + ) -> bool: + log.explain_topic(f"Decision: Should Download {fmt_path(path)}") + + if self._transformer.transform(path) is None: + log.explain("Answer: No (ignored)") + return False + + should_download = self._output_dir.should_try_download( + path, etag_differs=etag_differs, mtime=mtime, redownload=redownload, on_conflict=on_conflict + ) + if should_download: + log.explain("Answer: Yes") + return True + else: + log.explain("Answer: No") + return False + async def download( - self, - path: PurePath, - mtime: Optional[datetime] = None, - redownload: Optional[Redownload] = None, - on_conflict: Optional[OnConflict] = None, + self, + path: PurePath, + *, + etag_differs: Optional[bool] = None, + mtime: Optional[datetime] = None, + redownload: Optional[Redownload] = None, + on_conflict: Optional[OnConflict] = None, ) -> Optional[DownloadToken]: log.explain_topic(f"Decision: Download {fmt_path(path)}") path = self._deduplicator.mark(path) @@ -307,7 +337,14 @@ class Crawler(ABC): log.status("[bold bright_black]", "Ignored", fmt_path(path)) return None - fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict) + fs_token = await self._output_dir.download( + path, + transformed_path, + etag_differs=etag_differs, + mtime=mtime, + redownload=redownload, + on_conflict=on_conflict, + ) if fs_token is None: log.explain("Answer: No") return None @@ -357,7 +394,7 @@ class Crawler(ABC): log.warn("Couldn't find or load old report") return - seen: Set[PurePath] = set() + seen: set[PurePath] = set() for known in sorted(self.prev_report.found_paths): looking_at = list(reversed(known.parents)) + [known] for path in looking_at: diff --git a/PFERD/crawl/http_crawler.py b/PFERD/crawl/http_crawler.py index 44ec4dd..49d6013 100644 --- a/PFERD/crawl/http_crawler.py +++ b/PFERD/crawl/http_crawler.py @@ -1,35 +1,39 @@ import asyncio import http.cookies import ssl +from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Dict, List, Optional +from typing import Any, Optional import aiohttp import certifi from aiohttp.client import ClientTimeout +from bs4 import Tag from ..auth import Authenticator from ..config import Config from ..logging import log -from ..utils import fmt_real_path +from ..utils import fmt_real_path, sanitize_path_name from ..version import NAME, VERSION from .crawler import Crawler, CrawlerSection +ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags" + class HttpCrawlerSection(CrawlerSection): def http_timeout(self) -> float: - return self.s.getfloat("http_timeout", fallback=20) + return self.s.getfloat("http_timeout", fallback=30) class HttpCrawler(Crawler): COOKIE_FILE = PurePath(".cookies") def __init__( - self, - name: str, - section: HttpCrawlerSection, - config: Config, - shared_auth: Optional[Authenticator] = None, + self, + name: str, + section: HttpCrawlerSection, + config: Config, + shared_auth: Optional[Authenticator] = None, ) -> None: super().__init__(name, section, config) @@ -39,7 +43,7 @@ class HttpCrawler(Crawler): self._http_timeout = section.http_timeout() self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE) - self._shared_cookie_jar_paths: Optional[List[Path]] = None + self._shared_cookie_jar_paths: Optional[list[Path]] = None self._shared_auth = shared_auth self._output_dir.register_reserved(self.COOKIE_FILE) @@ -94,7 +98,7 @@ class HttpCrawler(Crawler): """ raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation") - def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None: + def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None: if not self._shared_auth: return @@ -169,24 +173,102 @@ class HttpCrawler(Crawler): log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(str(e)) + @staticmethod + def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath: + """ + Retrieves the hierarchy of headings associated with the give file link and constructs a folder + structure from them. + +

level headings usually only appear once and serve as the page title, so they would introduce + redundant nesting. To avoid this,

headings are ignored via the drop_h1 parameter. + """ + + def find_associated_headings(tag: Tag, level: int) -> PurePath: + if level == 0 or (level == 1 and drop_h1): + return PurePath() + + level_heading = tag.find_previous(name=f"h{level}") + + if level_heading is None: + return find_associated_headings(tag, level - 1) + + folder_name = sanitize_path_name(level_heading.get_text().strip()) + return find_associated_headings(level_heading, level - 1) / folder_name + + # start at level

because paragraph-level headings are usually too granular for folder names + return find_associated_headings(file_link, 3) + + def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]: + """ + If available, retrieves the entity tag for a given path which was stored in the previous report. + """ + if not self._output_dir.prev_report: + return None + + etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {} + return etags.get(str(path)) + + def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None: + """ + Adds an entity tag for a given path to the report's custom values. + """ + if not etag: + return + + etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {} + etags[str(path)] = etag + self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags) + + async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]: + """ + Requests the ETag and Last-Modified headers of a resource via a HEAD request. + If no entity tag / modification date can be obtained, the according value will be None. + """ + try: + async with self.session.head(resource_url) as resp: + if resp.status != 200: + return None, None + + etag_header = resp.headers.get("ETag") + last_modified_header = resp.headers.get("Last-Modified") + last_modified = None + + if last_modified_header: + try: + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives + datetime_format = "%a, %d %b %Y %H:%M:%S GMT" + last_modified = datetime.strptime(last_modified_header, datetime_format) + except ValueError: + # last_modified remains None + pass + + return etag_header, last_modified + except aiohttp.ClientError: + return None, None + async def run(self) -> None: self._request_count = 0 self._cookie_jar = aiohttp.CookieJar() self._load_cookies() async with aiohttp.ClientSession( - headers={"User-Agent": f"{NAME}/{VERSION}"}, - cookie_jar=self._cookie_jar, - connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), - timeout=ClientTimeout( - # 30 minutes. No download in the history of downloads was longer than 30 minutes. - # This is enough to transfer a 600 MB file over a 3 Mib/s connection. - # Allowing an arbitrary value could be annoying for overnight batch jobs - total=15 * 60, - connect=self._http_timeout, - sock_connect=self._http_timeout, - sock_read=self._http_timeout, - ) + headers={"User-Agent": f"{NAME}/{VERSION}"}, + cookie_jar=self._cookie_jar, + connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), + timeout=ClientTimeout( + # 30 minutes. No download in the history of downloads was longer than 30 minutes. + # This is enough to transfer a 600 MB file over a 3 Mib/s connection. + # Allowing an arbitrary value could be annoying for overnight batch jobs + total=15 * 60, + connect=self._http_timeout, + sock_connect=self._http_timeout, + sock_read=self._http_timeout, + ), + # See https://github.com/aio-libs/aiohttp/issues/6626 + # Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the + # passed signature. Shibboleth will not accept the broken signature and authentication will + # fail. + requote_redirect_url=False, ) as session: self.session = session try: diff --git a/PFERD/crawl/ilias/__init__.py b/PFERD/crawl/ilias/__init__.py index 26618a8..fa1aaed 100644 --- a/PFERD/crawl/ilias/__init__.py +++ b/PFERD/crawl/ilias/__init__.py @@ -1,3 +1,13 @@ -from .kit_ilias_web_crawler import KitIliasWebCrawler, KitIliasWebCrawlerSection +from .kit_ilias_web_crawler import ( + IliasWebCrawler, + IliasWebCrawlerSection, + KitIliasWebCrawler, + KitIliasWebCrawlerSection, +) -__all__ = ["KitIliasWebCrawler", "KitIliasWebCrawlerSection"] +__all__ = [ + "IliasWebCrawler", + "IliasWebCrawlerSection", + "KitIliasWebCrawler", + "KitIliasWebCrawlerSection", +] diff --git a/PFERD/crawl/ilias/async_helper.py b/PFERD/crawl/ilias/async_helper.py new file mode 100644 index 0000000..2e6b301 --- /dev/null +++ b/PFERD/crawl/ilias/async_helper.py @@ -0,0 +1,41 @@ +import asyncio +from collections.abc import Callable +from typing import Any, Optional + +import aiohttp + +from ...logging import log +from ..crawler import AWrapped, CrawlError, CrawlWarning + + +def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]: + def decorator(f: AWrapped) -> AWrapped: + async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]: + last_exception: Optional[BaseException] = None + for round in range(attempts): + try: + return await f(*args, **kwargs) + except aiohttp.ContentTypeError: # invalid content type + raise CrawlWarning("ILIAS returned an invalid content type") from None + except aiohttp.TooManyRedirects: + raise CrawlWarning("Got stuck in a redirect loop") from None + except aiohttp.ClientPayloadError as e: # encoding or not enough bytes + last_exception = e + except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc. + last_exception = e + except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler + last_exception = e + log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}") + log.explain(f"Last exception: {last_exception!r}") + + if last_exception: + message = f"Error in I/O Operation: {last_exception!r}" + if failure_is_error: + raise CrawlError(message) from last_exception + else: + raise CrawlWarning(message) from last_exception + raise CrawlError("Impossible return in ilias _iorepeat") + + return wrapper # type: ignore + + return decorator diff --git a/PFERD/crawl/ilias/file_templates.py b/PFERD/crawl/ilias/file_templates.py index b206461..c832977 100644 --- a/PFERD/crawl/ilias/file_templates.py +++ b/PFERD/crawl/ilias/file_templates.py @@ -1,5 +1,7 @@ +import dataclasses +import re from enum import Enum -from typing import Optional +from typing import Optional, cast import bs4 @@ -12,7 +14,9 @@ _link_template_fancy = """ ILIAS - Link: {{name}} + + -
- -
-
- {{name}} +
+ +
+ -
{{description}}
+
+
+ {{name}} +
+
{{description}}
+
+
- +
@@ -96,6 +111,7 @@ _link_template_fancy = """ _link_template_internet_shortcut = """ [InternetShortcut] URL={{link}} +Desc={{description}} """.strip() _learning_module_template = """ @@ -126,6 +142,88 @@ _learning_module_template = """ """ +_forum_thread_template = """ + + + + + ILIAS - Forum: {{name}} + + + + {{heading}} + {{content}} + + +""".strip() # noqa: E501 line too long + def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str: # Seems to be comments, ignore those. @@ -139,13 +237,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
""" if prev and body.select_one(".ilc_page_lnav_LeftNavigation"): - text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip() + text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip() left = f'{text}' else: left = "" if next and body.select_one(".ilc_page_rnav_RightNavigation"): - text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip() + text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip() right = f'{text}' else: right = "" @@ -156,12 +254,29 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next ) if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"): - bot_nav.replace_with(soupify(nav_template.replace( - "{{left}}", left).replace("{{right}}", right).encode()) + bot_nav.replace_with( + soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode()) ) - body = body.prettify() - return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name) + body_str = body.prettify() + return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name) + + +def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str: + if title := heading.find(name="b"): + title.wrap(bs4.Tag(name="a", attrs={"href": url})) + return ( + _forum_thread_template.replace("{{name}}", name) + .replace("{{heading}}", heading.prettify()) + .replace("{{content}}", content.prettify()) + ) + + +@dataclasses.dataclass +class LinkData: + name: str + url: str + description: str class Links(Enum): @@ -181,6 +296,9 @@ class Links(Enum): return None raise ValueError("Missing switch case") + def collection_as_one(self) -> bool: + return self == Links.FANCY + def extension(self) -> Optional[str]: if self == Links.FANCY: return ".html" @@ -192,10 +310,47 @@ class Links(Enum): return None raise ValueError("Missing switch case") + def interpolate(self, redirect_delay: int, collection_name: str, links: list[LinkData]) -> str: + template = self.template() + if template is None: + raise ValueError("Cannot interpolate ignored links") + + if len(links) == 1: + link = links[0] + content = template + content = content.replace("{{link}}", link.url) + content = content.replace("{{name}}", link.name) + content = content.replace("{{description}}", link.description) + content = content.replace("{{redirect_delay}}", str(redirect_delay)) + return content + if self == Links.PLAINTEXT or self == Links.INTERNET_SHORTCUT: + return "\n".join(f"{link.url}" for link in links) + + # All others get coerced to fancy + content = cast(str, Links.FANCY.template()) + repeated_content = cast( + re.Match[str], re.search(r"([\s\S]+)", content) + ).group(1) + + parts = [] + for link in links: + instance = repeated_content + instance = instance.replace("{{link}}", link.url) + instance = instance.replace("{{name}}", link.name) + instance = instance.replace("{{description}}", link.description) + instance = instance.replace("{{redirect_delay}}", str(redirect_delay)) + parts.append(instance) + + content = content.replace(repeated_content, "\n".join(parts)) + content = content.replace("{{name}}", collection_name) + content = re.sub(r"[\s\S]+", "", content) + + return content + @staticmethod def from_string(string: str) -> "Links": try: return Links(string) except ValueError: - raise ValueError("must be one of 'ignore', 'plaintext'," - " 'html', 'internet-shortcut'") + options = [f"'{option.value}'" for option in Links] + raise ValueError(f"must be one of {', '.join(options)}") from None diff --git a/PFERD/crawl/ilias/ilias_html_cleaner.py b/PFERD/crawl/ilias/ilias_html_cleaner.py index 5495304..35a7ea0 100644 --- a/PFERD/crawl/ilias/ilias_html_cleaner.py +++ b/PFERD/crawl/ilias/ilias_html_cleaner.py @@ -1,3 +1,5 @@ +from typing import cast + from bs4 import BeautifulSoup, Comment, Tag _STYLE_TAG_CONTENT = """ @@ -12,6 +14,13 @@ _STYLE_TAG_CONTENT = """ font-weight: bold; } + .row-flex { + display: flex; + } + .row-flex-wrap { + flex-wrap: wrap; + } + .accordion-head { background-color: #f5f7fa; padding: 0.5rem 0; @@ -30,6 +39,10 @@ _STYLE_TAG_CONTENT = """ margin: 0.5rem 0; } + img { + background-color: white; + } + body { padding: 1em; grid-template-columns: 1fr min(60rem, 90%) 1fr; @@ -47,12 +60,11 @@ _ARTICLE_WORTHY_CLASSES = [ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: head = soup.new_tag("head") soup.insert(0, head) + # Force UTF-8 encoding + head.append(soup.new_tag("meta", charset="utf-8")) - simplecss_link: Tag = soup.new_tag("link") # - simplecss_link["rel"] = "stylesheet" - simplecss_link["href"] = "https://cdn.simplecss.org/simple.css" - head.append(simplecss_link) + head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css")) # Basic style tags for compat style: Tag = soup.new_tag("style") @@ -63,18 +75,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: def clean(soup: BeautifulSoup) -> BeautifulSoup: - for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES): + for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)): block.name = "article" - for block in soup.find_all("h3"): + for block in cast(list[Tag], soup.find_all("h3")): block.name = "div" - for block in soup.find_all("h1"): + for block in cast(list[Tag], soup.find_all("h1")): block.name = "h3" - for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"): + for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")): block.name = "h3" - block["class"] += ["accordion-head"] + block["class"] += ["accordion-head"] # type: ignore for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"): children = list(dummy.children) @@ -85,7 +97,12 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup: if isinstance(type(children[0]), Comment): dummy.decompose() - for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): + # Delete video figures, as they can not be internalized anyway + for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"): + if figure := video.find_parent("figure"): + figure.decompose() + + for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")): hrule_imposter.insert(0, soup.new_tag("hr")) return soup diff --git a/PFERD/crawl/ilias/ilias_web_crawler.py b/PFERD/crawl/ilias/ilias_web_crawler.py new file mode 100644 index 0000000..b5041b3 --- /dev/null +++ b/PFERD/crawl/ilias/ilias_web_crawler.py @@ -0,0 +1,1074 @@ +import asyncio +import base64 +import os +import re +from collections.abc import Awaitable, Coroutine +from pathlib import PurePath +from typing import Any, Literal, Optional, cast +from urllib.parse import urljoin + +import aiohttp +from aiohttp import hdrs +from bs4 import BeautifulSoup, Tag + +from ...auth import Authenticator +from ...config import Config +from ...logging import ProgressBar, log +from ...output_dir import FileSink, Redownload +from ...utils import fmt_path, sanitize_path_name, soupify, url_set_query_param +from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical +from ..http_crawler import HttpCrawler, HttpCrawlerSection +from .async_helper import _iorepeat +from .file_templates import LinkData, Links, forum_thread_template, learning_module_template +from .ilias_html_cleaner import clean, insert_base_markup +from .kit_ilias_html import ( + IliasElementType, + IliasForumThread, + IliasLearningModulePage, + IliasPage, + IliasPageElement, + IliasSoup, + parse_ilias_forum_export, +) +from .shibboleth_login import ShibbolethLogin + +TargetType = str | int + + +class LoginTypeLocal: + def __init__(self, client_id: str): + self.client_id = client_id + + +class IliasWebCrawlerSection(HttpCrawlerSection): + def base_url(self) -> str: + base_url = self.s.get("base_url") + if not base_url: + self.missing_value("base_url") + + return base_url + + def login(self) -> Literal["shibboleth"] | LoginTypeLocal: + login_type = self.s.get("login_type") + if not login_type: + self.missing_value("login_type") + if login_type == "shibboleth": + return "shibboleth" + if login_type == "local": + client_id = self.s.get("client_id") + if not client_id: + self.missing_value("client_id") + return LoginTypeLocal(client_id) + + self.invalid_value("login_type", login_type, "Should be ") + + def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]: + value: Optional[str] = self.s.get("tfa_auth") + if value is None: + return None + auth = authenticators.get(value) + if auth is None: + self.invalid_value("tfa_auth", value, "No such auth section exists") + return auth + + def target(self) -> TargetType: + target = self.s.get("target") + if not target: + self.missing_value("target") + + if re.fullmatch(r"\d+", target): + # Course id + return int(target) + if target == "desktop": + # Full personal desktop + return target + if target.startswith(self.base_url()): + # URL + return target + + self.invalid_value("target", target, "Should be ") + + def links(self) -> Links: + type_str: Optional[str] = self.s.get("links") + + if type_str is None: + return Links.FANCY + + try: + return Links.from_string(type_str) + except ValueError as e: + self.invalid_value("links", type_str, str(e).capitalize()) + + def link_redirect_delay(self) -> int: + return self.s.getint("link_redirect_delay", fallback=-1) + + def videos(self) -> bool: + return self.s.getboolean("videos", fallback=False) + + def forums(self) -> bool: + return self.s.getboolean("forums", fallback=False) + + +_DIRECTORY_PAGES: set[IliasElementType] = { + IliasElementType.EXERCISE, + IliasElementType.EXERCISE_FILES, + IliasElementType.EXERCISE_OVERVIEW, + IliasElementType.FOLDER, + IliasElementType.INFO_TAB, + IliasElementType.MEDIACAST_VIDEO_FOLDER, + IliasElementType.MEETING, + IliasElementType.OPENCAST_VIDEO_FOLDER, + IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, +} + +_VIDEO_ELEMENTS: set[IliasElementType] = { + IliasElementType.MEDIACAST_VIDEO, + IliasElementType.MEDIACAST_VIDEO_FOLDER, + IliasElementType.OPENCAST_VIDEO, + IliasElementType.OPENCAST_VIDEO_FOLDER, + IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, + IliasElementType.OPENCAST_VIDEO_PLAYER, +} + + +def _get_video_cache_key(element: IliasPageElement) -> str: + return f"ilias-video-cache-{element.id()}" + + +# Crawler control flow: +# +# crawl_desktop -+ +# | +# crawl_course --+ +# | +# @_io_repeat | # retries internally (before the bar) +# +- crawl_url <-+ +# | +# | +# | @_wrap_io_exception # does not need to retry as children acquire bars +# +> crawl_ilias_element -+ +# ^ | +# | @_io_repeat | # retries internally (before the bar) +# +- crawl_ilias_page <---+ +# | | +# +> get_page | # Handles and retries authentication +# | +# @_io_repeat | # retries internally (before the bar) +# +- download_link <---+ +# | | +# +> resolve_target | # Handles and retries authentication +# | +# @_io_repeat | # retries internally (before the bar) +# +- download_video <---+ +# | | +# | @_io_repeat | # retries internally (before the bar) +# +- download_file <---+ +# | +# +> stream_from_url # Handles and retries authentication +class IliasWebCrawler(HttpCrawler): + def __init__( + self, + name: str, + section: IliasWebCrawlerSection, + config: Config, + authenticators: dict[str, Authenticator], + ): + # Setting a main authenticator for cookie sharing + auth = section.auth(authenticators) + super().__init__(name, section, config, shared_auth=auth) + + if section.tasks() > 1: + log.warn( + """ +Please avoid using too many parallel requests as these are the KIT ILIAS +instance's greatest bottleneck. + """.strip() + ) + + self._auth = auth + self._base_url = section.base_url() + self._tfa_auth = section.tfa_auth(authenticators) + + self._login_type = section.login() + if isinstance(self._login_type, LoginTypeLocal): + self._client_id = self._login_type.client_id + else: + self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth) + + self._target = section.target() + self._link_file_redirect_delay = section.link_redirect_delay() + self._links = section.links() + self._videos = section.videos() + self._forums = section.forums() + self._visited_urls: dict[str, PurePath] = dict() + + async def _run(self) -> None: + if isinstance(self._target, int): + log.explain_topic(f"Inferred crawl target: Course with id {self._target}") + await self._crawl_course(self._target) + elif self._target == "desktop": + log.explain_topic("Inferred crawl target: Personal desktop") + await self._crawl_desktop() + else: + log.explain_topic(f"Inferred crawl target: URL {self._target}") + await self._crawl_url(self._target) + + async def _crawl_course(self, course_id: int) -> None: + # Start crawling at the given course + root_url = url_set_query_param( + urljoin(self._base_url + "/", "goto.php"), + "target", + f"crs_{course_id}", + ) + + await self._crawl_url(root_url, expected_id=course_id) + + async def _crawl_desktop(self) -> None: + await self._crawl_url( + urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), crawl_nested_courses=True + ) + + async def _crawl_url( + self, url: str, expected_id: Optional[int] = None, crawl_nested_courses: bool = False + ) -> None: + if awaitable := await self._handle_ilias_page( + url, None, PurePath("."), expected_id, crawl_nested_courses + ): + await awaitable + + async def _handle_ilias_page( + self, + url: str, + current_element: Optional[IliasPageElement], + path: PurePath, + expected_course_id: Optional[int] = None, + crawl_nested_courses: bool = False, + ) -> Optional[Coroutine[Any, Any, None]]: + maybe_cl = await self.crawl(path) + if not maybe_cl: + return None + if current_element: + self._ensure_not_seen(current_element, path) + + return self._crawl_ilias_page( + url, current_element, maybe_cl, expected_course_id, crawl_nested_courses + ) + + @anoncritical + async def _crawl_ilias_page( + self, + url: str, + current_element: Optional[IliasPageElement], + cl: CrawlToken, + expected_course_id: Optional[int] = None, + crawl_nested_courses: bool = False, + ) -> None: + elements: list[IliasPageElement] = [] + # A list as variable redefinitions are not propagated to outer scopes + description: list[BeautifulSoup] = [] + + @_iorepeat(3, "crawling folder") + async def gather_elements() -> None: + elements.clear() + async with cl: + next_stage_url: Optional[str] = url + current_parent = current_element + page = None + + while next_stage_url: + soup = await self._get_page(next_stage_url) + log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") + log.explain(f"URL: {next_stage_url}") + + # If we expect to find a root course, enforce it + if current_parent is None and expected_course_id is not None: + perma_link = IliasPage.get_soup_permalink(soup) + if not perma_link or "crs/" not in perma_link: + raise CrawlError("Invalid course id? Didn't find anything looking like a course") + if str(expected_course_id) not in perma_link: + raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}") + + page = IliasPage(soup, current_parent) + if next_element := page.get_next_stage_element(): + current_parent = next_element + next_stage_url = next_element.url + else: + next_stage_url = None + + page = cast(IliasPage, page) + elements.extend(page.get_child_elements()) + if current_element is None and (info_tab := page.get_info_tab()): + elements.append(info_tab) + if description_string := page.get_description(): + description.append(description_string) + + # Fill up our task list with the found elements + await gather_elements() + + if description: + await self._download_description(cl.path, description[0]) + + elements.sort(key=lambda e: e.id()) + + tasks: list[Awaitable[None]] = [] + for element in elements: + if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses): + tasks.append(asyncio.create_task(handle)) + + # And execute them + await self.gather(tasks) + + # These decorators only apply *to this method* and *NOT* to the returned + # awaitables! + # This method does not await the handlers but returns them instead. + # This ensures one level is handled at a time and name deduplication + # works correctly. + @anoncritical + async def _handle_ilias_element( + self, parent_path: PurePath, element: IliasPageElement, crawl_nested_courses: bool = False + ) -> Optional[Coroutine[Any, Any, None]]: + # element.name might contain `/` if the crawler created nested elements, + # so we can not sanitize it here. We trust in the output dir to thwart worst-case + # directory escape attacks. + element_path = PurePath(parent_path, element.name) + + # This is symptomatic of no access to the element, for example, because + # of time availability restrictions. + if "cmdClass=ilInfoScreenGUI" in element.url and "cmd=showSummary" in element.url: + log.explain( + "Skipping element as url points to info screen, " + "this should only happen with not-yet-released elements" + ) + return None + + if element.type in _VIDEO_ELEMENTS and not self._videos: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](enable with option 'videos')", + ) + return None + + if element.type == IliasElementType.FILE: + return await self._handle_file(element, element_path) + elif element.type == IliasElementType.FORUM: + if not self._forums: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](enable with option 'forums')", + ) + return None + return await self._handle_forum(element, element_path) + elif element.type == IliasElementType.TEST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](tests contain no relevant data)", + ) + return None + elif element.type == IliasElementType.SURVEY: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](surveys contain no relevant data)", + ) + return None + elif element.type == IliasElementType.SCORM_LEARNING_MODULE: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](scorm learning modules are not supported)", + ) + return None + elif element.type == IliasElementType.LITERATURE_LIST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](literature lists are not currently supported)", + ) + return None + elif element.type == IliasElementType.LEARNING_MODULE_HTML: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](HTML learning modules are not supported)", + ) + return None + elif element.type == IliasElementType.BLOG: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](blogs are not currently supported)", + ) + return None + elif element.type == IliasElementType.DCL_RECORD_LIST: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](dcl record lists are not currently supported)", + ) + return None + elif element.type == IliasElementType.MEDIA_POOL: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](media pools are not currently supported)", + ) + return None + elif element.type == IliasElementType.COURSE: + if crawl_nested_courses: + return await self._handle_ilias_page(element.url, element, element_path) + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](not descending into linked course)", + ) + return None + elif element.type == IliasElementType.WIKI: + log.status( + "[bold bright_black]", + "Ignored", + fmt_path(element_path), + "[bright_black](wikis are not currently supported)", + ) + return None + elif element.type == IliasElementType.LEARNING_MODULE: + return await self._handle_learning_module(element, element_path) + elif element.type == IliasElementType.LINK: + return await self._handle_link(element, element_path) + elif element.type == IliasElementType.LINK_COLLECTION: + return await self._handle_link(element, element_path) + elif element.type == IliasElementType.BOOKING: + return await self._handle_booking(element, element_path) + elif element.type == IliasElementType.OPENCAST_VIDEO: + return await self._handle_file(element, element_path) + elif element.type == IliasElementType.OPENCAST_VIDEO_PLAYER: + return await self._handle_opencast_video(element, element_path) + elif element.type == IliasElementType.MEDIACAST_VIDEO: + return await self._handle_file(element, element_path) + elif element.type == IliasElementType.MOB_VIDEO: + return await self._handle_file(element, element_path, is_video=True) + elif element.type in _DIRECTORY_PAGES: + return await self._handle_ilias_page(element.url, element, element_path) + else: + # This will retry it a few times, failing everytime. It doesn't make any network + # requests, so that's fine. + raise CrawlWarning(f"Unknown element type: {element.type!r}") + + async def _handle_link( + self, + element: IliasPageElement, + element_path: PurePath, + ) -> Optional[Coroutine[Any, Any, None]]: + log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}") + log.explain(f"Links type is {self._links}") + + export_url = url_set_query_param(element.url, "cmd", "exportHTML") + resolved = await self._resolve_link_target(export_url) + if resolved == "none": + links = [LinkData(element.name, "", element.description or "")] + else: + links = self._parse_link_content(element, cast(BeautifulSoup, resolved)) + + maybe_extension = self._links.extension() + + if not maybe_extension: + log.explain("Answer: No") + return None + else: + log.explain("Answer: Yes") + + if len(links) <= 1 or self._links.collection_as_one(): + element_path = element_path.with_name(element_path.name + maybe_extension) + maybe_dl = await self.download(element_path, mtime=element.mtime) + if not maybe_dl: + return None + return self._download_link(self._links, element.name, links, maybe_dl) + + maybe_cl = await self.crawl(element_path) + if not maybe_cl: + return None + # Required for download_all closure + cl = maybe_cl + extension = maybe_extension + + async def download_all() -> None: + for link in links: + path = cl.path / (sanitize_path_name(link.name) + extension) + if dl := await self.download(path, mtime=element.mtime): + await self._download_link(self._links, element.name, [link], dl) + + return download_all() + + @anoncritical + @_iorepeat(3, "resolving link") + async def _download_link( + self, link_renderer: Links, collection_name: str, links: list[LinkData], dl: DownloadToken + ) -> None: + async with dl as (bar, sink): + rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links) + sink.file.write(rendered.encode("utf-8")) + sink.done() + + async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]: + async def impl() -> Optional[BeautifulSoup | Literal["none"]]: + async with self.session.get(export_url, allow_redirects=False) as resp: + # No redirect means we were authenticated + if hdrs.LOCATION not in resp.headers: + return soupify(await resp.read()) # .select_one("a").get("href").strip() # type: ignore + # We are either unauthenticated or the link is not active + new_url = resp.headers[hdrs.LOCATION].lower() + if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url: + return "none" + return None + + auth_id = await self._current_auth_id() + target = await impl() + if target is not None: + return target + + await self.authenticate(auth_id) + + target = await impl() + if target is not None: + return target + + raise CrawlError("resolve_link_target failed even after authenticating") + + @staticmethod + def _parse_link_content(element: IliasPageElement, content: BeautifulSoup) -> list[LinkData]: + links = list(content.select("a")) + if len(links) == 1: + url = str(links[0].get("href")).strip() + return [LinkData(name=element.name, description=element.description or "", url=url)] + + results = [] + for link in links: + url = str(link.get("href")).strip() + name = link.get_text(strip=True) + description = cast(Tag, link.find_next_sibling("dd")).get_text(strip=True) + results.append(LinkData(name=name, description=description, url=url.strip())) + + return results + + async def _handle_booking( + self, + element: IliasPageElement, + element_path: PurePath, + ) -> Optional[Coroutine[Any, Any, None]]: + log.explain_topic(f"Decision: Crawl Booking Link {fmt_path(element_path)}") + log.explain(f"Links type is {self._links}") + + link_template_maybe = self._links.template() + link_extension = self._links.extension() + if not link_template_maybe or not link_extension: + log.explain("Answer: No") + return None + else: + log.explain("Answer: Yes") + element_path = element_path.with_name(element_path.name + link_extension) + + maybe_dl = await self.download(element_path, mtime=element.mtime) + if not maybe_dl: + return None + + self._ensure_not_seen(element, element_path) + + return self._download_booking(element, maybe_dl) + + @anoncritical + @_iorepeat(1, "downloading description") + async def _download_description(self, parent_path: PurePath, description: BeautifulSoup) -> None: + path = parent_path / "Description.html" + dl = await self.download(path, redownload=Redownload.ALWAYS) + if not dl: + return + + async with dl as (_bar, sink): + description = clean(insert_base_markup(description)) + description_tag = await self.internalize_images(description) + sink.file.write(description_tag.prettify().encode("utf-8")) + sink.done() + + @anoncritical + @_iorepeat(3, "resolving booking") + async def _download_booking( + self, + element: IliasPageElement, + dl: DownloadToken, + ) -> None: + async with dl as (bar, sink): + links = [LinkData(name=element.name, description=element.description or "", url=element.url)] + rendered = self._links.interpolate(self._link_file_redirect_delay, element.name, links) + sink.file.write(rendered.encode("utf-8")) + sink.done() + + async def _handle_opencast_video( + self, + element: IliasPageElement, + element_path: PurePath, + ) -> Optional[Coroutine[Any, Any, None]]: + # Copy old mapping as it is likely still relevant + if self.prev_report: + self.report.add_custom_value( + _get_video_cache_key(element), + self.prev_report.get_custom_value(_get_video_cache_key(element)), + ) + + # A video might contain other videos, so let's "crawl" the video first + # to ensure rate limits apply. This must be a download as *this token* + # is re-used if the video consists of a single stream. In that case the + # file name is used and *not* the stream name the ilias html parser reported + # to ensure backwards compatibility. + maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS) + + # If we do not want to crawl it (user filter), we can move on + if not maybe_dl: + return None + + self._ensure_not_seen(element, element_path) + + # If we have every file from the cached mapping already, we can ignore this and bail + if self._all_opencast_videos_locally_present(element, maybe_dl.path): + # Mark all existing videos as known to ensure they do not get deleted during cleanup. + # We "downloaded" them, just without actually making a network request as we assumed + # they did not change. + contained = self._previous_contained_opencast_videos(element, maybe_dl.path) + if len(contained) > 1: + # Only do this if we threw away the original dl token, + # to not download single-stream videos twice + for video in contained: + await self.download(video) + + return None + + return self._download_opencast_video(element, maybe_dl) + + def _previous_contained_opencast_videos( + self, element: IliasPageElement, element_path: PurePath + ) -> list[PurePath]: + if not self.prev_report: + return [] + custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) + if not custom_value: + return [] + cached_value = cast(dict[str, Any], custom_value) + if "known_paths" not in cached_value or "own_path" not in cached_value: + log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}") + return [] + transformed_own_path = self._transformer.transform(element_path) + if cached_value["own_path"] != str(transformed_own_path): + log.explain( + f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}" + ) + return [] + return [PurePath(name) for name in cached_value["known_paths"]] + + def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool: + log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}") + if contained_videos := self._previous_contained_opencast_videos(element, element_path): + log.explain( + f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}" + ) + if all(self._output_dir.resolve(path).exists() for path in contained_videos): + log.explain("Found all known videos locally, skipping enumeration request") + return True + log.explain("Missing at least one video, continuing with requests!") + else: + log.explain("No local cache present") + return False + + @anoncritical + @_iorepeat(3, "downloading video") + async def _download_opencast_video(self, element: IliasPageElement, dl: DownloadToken) -> None: + def add_to_report(paths: list[str]) -> None: + self.report.add_custom_value( + _get_video_cache_key(element), + {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}, + ) + + async with dl as (bar, sink): + page = IliasPage(await self._get_page(element.url), element) + stream_elements = page.get_child_elements() + + if len(stream_elements) > 1: + log.explain(f"Found multiple video streams for {element.name}") + else: + log.explain(f"Using single video mode for {element.name}") + stream_element = stream_elements[0] + + # We do not have a local cache yet + await self._stream_from_url(stream_element, sink, bar, is_video=True) + add_to_report([str(self._transformer.transform(dl.path))]) + return + + contained_video_paths: list[str] = [] + + for stream_element in stream_elements: + video_path = dl.path.parent / stream_element.name + + maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER) + if not maybe_dl: + continue + async with maybe_dl as (bar, sink): + log.explain(f"Streaming video from real url {stream_element.url}") + contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) + await self._stream_from_url(stream_element, sink, bar, is_video=True) + + add_to_report(contained_video_paths) + + async def _handle_file( + self, + element: IliasPageElement, + element_path: PurePath, + is_video: bool = False, + ) -> Optional[Coroutine[Any, Any, None]]: + maybe_dl = await self.download(element_path, mtime=element.mtime) + if not maybe_dl: + return None + self._ensure_not_seen(element, element_path) + + return self._download_file(element, maybe_dl, is_video) + + @_iorepeat(3, "downloading file") + @anoncritical + async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None: + assert dl # The function is only reached when dl is not None + async with dl as (bar, sink): + await self._stream_from_url(element, sink, bar, is_video) + + async def _stream_from_url( + self, element: IliasPageElement, sink: FileSink, bar: ProgressBar, is_video: bool + ) -> None: + url = element.url + + async def try_stream() -> bool: + next_url = url + # Normal files redirect to the magazine if we are not authenticated. As files could be HTML, + # we can not match on the content type here. Instead, we disallow redirects and inspect the + # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume + # our authentication expired. + if not is_video: + async with self.session.get(url, allow_redirects=False) as resp: + # Redirect to anything except a "sendfile" means we weren't authenticated + if hdrs.LOCATION in resp.headers: + if "&cmd=sendfile" not in resp.headers[hdrs.LOCATION]: + return False + # Directly follow the redirect to not make a second, unnecessary request + next_url = resp.headers[hdrs.LOCATION] + + # Let's try this again and follow redirects + return await fetch_follow_redirects(next_url) + + async def fetch_follow_redirects(file_url: str) -> bool: + async with self.session.get(file_url) as resp: + # We wanted a video but got HTML => Forbidden, auth expired. Logging in won't really + # solve that depending on the setup, but it is better than nothing. + if is_video and "html" in resp.content_type: + return False + + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range + if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None): + parts = content_range.split("/") + if len(parts) == 2 and parts[1].isdigit(): + bar.set_total(int(parts[1])) + + # Prefer the content length header + if resp.content_length: + bar.set_total(resp.content_length) + + async for data in resp.content.iter_chunked(1024): + sink.file.write(data) + bar.advance(len(data)) + + sink.done() + return True + + auth_id = await self._current_auth_id() + if await try_stream(): + return + + await self.authenticate(auth_id) + + if not await try_stream(): + raise CrawlError(f"File streaming failed after authenticate() {element!r}") + + async def _handle_forum( + self, + element: IliasPageElement, + element_path: PurePath, + ) -> Optional[Coroutine[Any, Any, None]]: + maybe_cl = await self.crawl(element_path) + if not maybe_cl: + return None + return self._crawl_forum(element, maybe_cl) + + @_iorepeat(3, "crawling forum") + @anoncritical + async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None: + async with cl: + inner = IliasPage(await self._get_page(element.url), element) + export_url = inner.get_forum_export_url() + if not export_url: + log.warn("Could not extract forum export url") + return + + export = await self._post( + export_url, + {"format": "html", "cmd[createExportFile]": ""}, + ) + + elements = parse_ilias_forum_export(soupify(export)) + + tasks: list[Awaitable[None]] = [] + for thread in elements: + tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url))) + + # And execute them + await self.gather(tasks) + + @anoncritical + @_iorepeat(3, "saving forum thread") + async def _download_forum_thread( + self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str + ) -> None: + path = parent_path / (sanitize_path_name(thread.name) + ".html") + maybe_dl = await self.download(path, mtime=thread.mtime) + if not maybe_dl or not isinstance(thread, IliasForumThread): + return + + async with maybe_dl as (bar, sink): + rendered = forum_thread_template( + thread.name, forum_url, thread.name_tag, await self.internalize_images(thread.content_tag) + ) + sink.file.write(rendered.encode("utf-8")) + sink.done() + + async def _handle_learning_module( + self, + element: IliasPageElement, + element_path: PurePath, + ) -> Optional[Coroutine[Any, Any, None]]: + maybe_cl = await self.crawl(element_path) + if not maybe_cl: + return None + self._ensure_not_seen(element, element_path) + + return self._crawl_learning_module(element, maybe_cl) + + @_iorepeat(3, "crawling learning module") + @anoncritical + async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None: + elements: list[IliasLearningModulePage] = [] + + async with cl: + log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") + log.explain(f"URL: {element.url}") + soup = await self._get_page(element.url) + page = IliasPage(soup, element) + if next := page.get_learning_module_data(): + elements.extend( + await self._crawl_learning_module_direction(cl.path, next.previous_url, "left", element) + ) + elements.append(next) + elements.extend( + await self._crawl_learning_module_direction(cl.path, next.next_url, "right", element) + ) + + # Reflect their natural ordering in the file names + for index, lm_element in enumerate(elements): + lm_element.title = f"{index:02}_{lm_element.title}" + + tasks: list[Awaitable[None]] = [] + for index, elem in enumerate(elements): + prev_url = elements[index - 1].title if index > 0 else None + next_url = elements[index + 1].title if index < len(elements) - 1 else None + tasks.append( + asyncio.create_task(self._download_learning_module_page(cl.path, elem, prev_url, next_url)) + ) + + # And execute them + await self.gather(tasks) + + async def _crawl_learning_module_direction( + self, + path: PurePath, + start_url: Optional[str], + dir: Literal["left"] | Literal["right"], + parent_element: IliasPageElement, + ) -> list[IliasLearningModulePage]: + elements: list[IliasLearningModulePage] = [] + + if not start_url: + return elements + + next_element_url: Optional[str] = start_url + counter = 0 + while next_element_url: + log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") + log.explain(f"URL: {next_element_url}") + soup = await self._get_page(next_element_url) + page = IliasPage(soup, parent_element) + if next := page.get_learning_module_data(): + elements.append(next) + next_element_url = next.previous_url if dir == "left" else next.next_url + counter += 1 + + return elements + + @anoncritical + @_iorepeat(3, "saving learning module page") + async def _download_learning_module_page( + self, + parent_path: PurePath, + element: IliasLearningModulePage, + prev: Optional[str], + next: Optional[str], + ) -> None: + path = parent_path / (sanitize_path_name(element.title) + ".html") + maybe_dl = await self.download(path) + if not maybe_dl: + return + my_path = self._transformer.transform(maybe_dl.path) + if not my_path: + return + + if prev: + prev_p = self._transformer.transform(parent_path / (sanitize_path_name(prev) + ".html")) + prev = os.path.relpath(prev_p, my_path.parent) if prev_p else None + if next: + next_p = self._transformer.transform(parent_path / (sanitize_path_name(next) + ".html")) + next = os.path.relpath(next_p, my_path.parent) if next_p else None + + async with maybe_dl as (bar, sink): + content = element.content + content = await self.internalize_images(content) + sink.file.write(learning_module_template(content, maybe_dl.path.name, prev, next).encode("utf-8")) + sink.done() + + async def internalize_images(self, tag: Tag) -> Tag: + """ + Tries to fetch ILIAS images and embed them as base64 data. + """ + log.explain_topic("Internalizing images") + for elem in tag.find_all(recursive=True): + if elem.name == "img" and (src := elem.attrs.get("src", None)): + url = urljoin(self._base_url, cast(str, src)) + if not url.startswith(self._base_url): + continue + log.explain(f"Internalizing {url!r}") + img = await self._get_authenticated(url) + elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() + if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"): + # For unknown reasons the protocol seems to be stripped. + elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"]) + return tag + + def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None: + if element.url in self._visited_urls: + raise CrawlWarning( + f"Found second path to element {element.name!r} at {element.url!r}. " + + f"First path: {fmt_path(self._visited_urls[element.url])}. " + + f"Second path: {fmt_path(parent_path)}." + ) + self._visited_urls[element.url] = parent_path + + async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup: + auth_id = await self._current_auth_id() + async with self.session.get(url) as request: + soup = IliasSoup(soupify(await request.read()), str(request.url)) + if IliasPage.is_logged_in(soup): + return self._verify_page(soup, url, root_page_allowed) + + # We weren't authenticated, so try to do that + await self.authenticate(auth_id) + + # Retry once after authenticating. If this fails, we will die. + async with self.session.get(url) as request: + soup = IliasSoup(soupify(await request.read()), str(request.url)) + if IliasPage.is_logged_in(soup): + return self._verify_page(soup, url, root_page_allowed) + raise CrawlError(f"get_page failed even after authenticating on {url!r}") + + @staticmethod + def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup: + if IliasPage.is_root_page(soup) and not root_page_allowed: + raise CrawlError( + "Unexpectedly encountered ILIAS root page. " + "This usually happens because the ILIAS instance is broken. " + "If so, wait a day or two and try again. " + "It could also happen because a crawled element links to the ILIAS root page. " + "If so, use a transform with a ! as target to ignore the particular element. " + f"The redirect came from {url}" + ) + return soup + + async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes: + form_data = aiohttp.FormData() + for key, val in data.items(): + form_data.add_field(key, val) + + async with self.session.post(url, data=form_data()) as request: + if request.status == 200: + return await request.read() + raise CrawlError(f"post failed with status {request.status}") + + async def _get_authenticated(self, url: str) -> bytes: + auth_id = await self._current_auth_id() + + async with self.session.get(url, allow_redirects=False) as request: + if request.status == 200: + return await request.read() + + # We weren't authenticated, so try to do that + await self.authenticate(auth_id) + + # Retry once after authenticating. If this fails, we will die. + async with self.session.get(url, allow_redirects=False) as request: + if request.status == 200: + return await request.read() + raise CrawlError("get_authenticated failed even after authenticating") + + async def _authenticate(self) -> None: + # fill the session with the correct cookies + if self._login_type == "shibboleth": + await self._shibboleth_login.login(self.session) + else: + params = { + "client_id": self._client_id, + "cmd": "force_login", + } + async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: + login_page = soupify(await request.read()) + + login_form = login_page.find("form", attrs={"name": "login_form"}) + if login_form is None: + raise CrawlError("Could not find the login form! Specified client id might be invalid.") + + login_url = cast(Optional[str], login_form.attrs.get("action")) + if login_url is None: + raise CrawlError("Could not find the action URL in the login form!") + + username, password = await self._auth.credentials() + + login_form_data = aiohttp.FormData() + login_form_data.add_field("login_form/input_3/input_4", username) + login_form_data.add_field("login_form/input_3/input_5", password) + + # do the actual login + async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request: + soup = IliasSoup(soupify(await request.read()), str(request.url)) + if not IliasPage.is_logged_in(soup): + self._auth.invalidate_credentials() diff --git a/PFERD/crawl/ilias/kit_ilias_html.py b/PFERD/crawl/ilias/kit_ilias_html.py index 4cfec9b..5966141 100644 --- a/PFERD/crawl/ilias/kit_ilias_html.py +++ b/PFERD/crawl/ilias/kit_ilias_html.py @@ -1,39 +1,245 @@ import json import re +from collections.abc import Callable from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum -from typing import Dict, List, Optional, Union, cast +from typing import Optional, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag +from PFERD.crawl import CrawlError +from PFERD.crawl.crawler import CrawlWarning from PFERD.logging import log -from PFERD.utils import url_set_query_params +from PFERD.utils import sanitize_path_name, url_set_query_params -TargetType = Union[str, int] +TargetType = str | int + + +class TypeMatcher: + class UrlPath: + path: str + + def __init__(self, path: str): + self.path = path + + class UrlParameter: + query: str + + def __init__(self, query: str): + self.query = query + + class ImgSrc: + src: str + + def __init__(self, src: str): + self.src = src + + class ImgAlt: + alt: str + + def __init__(self, alt: str): + self.alt = alt + + class All: + matchers: list["IliasElementMatcher"] + + def __init__(self, matchers: list["IliasElementMatcher"]): + self.matchers = matchers + + class Any: + matchers: list["IliasElementMatcher"] + + def __init__(self, matchers: list["IliasElementMatcher"]): + self.matchers = matchers + + @staticmethod + def path(path: str) -> UrlPath: + return TypeMatcher.UrlPath(path) + + @staticmethod + def query(query: str) -> UrlParameter: + return TypeMatcher.UrlParameter(query) + + @staticmethod + def img_src(src: str) -> ImgSrc: + return TypeMatcher.ImgSrc(src) + + @staticmethod + def img_alt(alt: str) -> ImgAlt: + return TypeMatcher.ImgAlt(alt) + + @staticmethod + def all(*matchers: "IliasElementMatcher") -> All: + return TypeMatcher.All(list(matchers)) + + @staticmethod + def any(*matchers: "IliasElementMatcher") -> Any: + return TypeMatcher.Any(list(matchers)) + + @staticmethod + def never() -> Any: + return TypeMatcher.Any([]) + + +IliasElementMatcher = ( + TypeMatcher.UrlPath + | TypeMatcher.UrlParameter + | TypeMatcher.ImgSrc + | TypeMatcher.ImgAlt + | TypeMatcher.All + | TypeMatcher.Any +) class IliasElementType(Enum): - EXERCISE = "exercise" + BLOG = "blog" + BOOKING = "booking" + COURSE = "course" + DCL_RECORD_LIST = "dcl_record_list" + EXERCISE_OVERVIEW = "exercise_overview" + EXERCISE = "exercise" # own submitted files EXERCISE_FILES = "exercise_files" # own submitted files - TEST = "test" # an online test. Will be ignored currently. FILE = "file" FOLDER = "folder" FORUM = "forum" - LINK = "link" + FORUM_THREAD = "forum_thread" INFO_TAB = "info_tab" LEARNING_MODULE = "learning_module" - BOOKING = "booking" - MEETING = "meeting" - SURVEY = "survey" - SCORM_LEARNING_MODULE = "scorm_learning_module" - MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" + LEARNING_MODULE_HTML = "learning_module_html" + LITERATURE_LIST = "literature_list" + LINK = "link" + LINK_COLLECTION = "link_collection" + MEDIA_POOL = "media_pool" MEDIACAST_VIDEO = "mediacast_video" + MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" + MEETING = "meeting" + MOB_VIDEO = "mob_video" OPENCAST_VIDEO = "opencast_video" - OPENCAST_VIDEO_PLAYER = "opencast_video_player" OPENCAST_VIDEO_FOLDER = "opencast_video_folder" OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated" + OPENCAST_VIDEO_PLAYER = "opencast_video_player" + SCORM_LEARNING_MODULE = "scorm_learning_module" + SURVEY = "survey" + TEST = "test" # an online test. Will be ignored currently. + WIKI = "wiki" + + def matcher(self) -> IliasElementMatcher: + match self: + case IliasElementType.BLOG: + return TypeMatcher.any(TypeMatcher.img_src("_blog.svg")) + case IliasElementType.BOOKING: + return TypeMatcher.any(TypeMatcher.path("/book/"), TypeMatcher.img_src("_book.svg")) + case IliasElementType.COURSE: + return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg")) + case IliasElementType.DCL_RECORD_LIST: + return TypeMatcher.any( + TypeMatcher.img_src("_dcl.svg"), TypeMatcher.query("cmdclass=ildclrecordlistgui") + ) + case IliasElementType.EXERCISE: + return TypeMatcher.never() + case IliasElementType.EXERCISE_FILES: + return TypeMatcher.never() + case IliasElementType.EXERCISE_OVERVIEW: + return TypeMatcher.any( + TypeMatcher.path("/exc/"), + TypeMatcher.path("_exc_"), + TypeMatcher.img_src("_exc.svg"), + ) + case IliasElementType.FILE: + return TypeMatcher.any( + TypeMatcher.query("cmd=sendfile"), + TypeMatcher.path("_file_"), + TypeMatcher.img_src("/filedelivery/"), + ) + case IliasElementType.FOLDER: + return TypeMatcher.any( + TypeMatcher.path("/fold/"), + TypeMatcher.img_src("_fold.svg"), + TypeMatcher.path("/grp/"), + TypeMatcher.img_src("_grp.svg"), + TypeMatcher.path("/copa/"), + TypeMatcher.path("_copa_"), + TypeMatcher.img_src("_copa.svg"), + # Not supported right now but warn users + # TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), + # TypeMatcher.img_alt("medienpool"), + # TypeMatcher.img_src("_mep.svg"), + ) + case IliasElementType.FORUM: + return TypeMatcher.any( + TypeMatcher.path("/frm/"), + TypeMatcher.path("_frm_"), + TypeMatcher.img_src("_frm.svg"), + ) + case IliasElementType.FORUM_THREAD: + return TypeMatcher.never() + case IliasElementType.INFO_TAB: + return TypeMatcher.never() + case IliasElementType.LITERATURE_LIST: + return TypeMatcher.img_src("_bibl.svg") + case IliasElementType.LEARNING_MODULE: + return TypeMatcher.any(TypeMatcher.path("/lm/"), TypeMatcher.img_src("_lm.svg")) + case IliasElementType.LEARNING_MODULE_HTML: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilhtlmpresentationgui"), TypeMatcher.img_src("_htlm.svg") + ) + case IliasElementType.LINK: + return TypeMatcher.any( + TypeMatcher.all( + TypeMatcher.query("baseclass=illinkresourcehandlergui"), + TypeMatcher.query("calldirectlink"), + ), + TypeMatcher.img_src("_webr.svg"), # duplicated :( + ) + case IliasElementType.LINK_COLLECTION: + return TypeMatcher.any( + TypeMatcher.query("baseclass=illinkresourcehandlergui"), + TypeMatcher.img_src("_webr.svg"), # duplicated :( + ) + case IliasElementType.MEDIA_POOL: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), TypeMatcher.img_src("_mep.svg") + ) + case IliasElementType.MEDIACAST_VIDEO: + return TypeMatcher.never() + case IliasElementType.MEDIACAST_VIDEO_FOLDER: + return TypeMatcher.any( + TypeMatcher.path("/mcst/"), + TypeMatcher.query("baseclass=ilmediacasthandlergui"), + TypeMatcher.img_src("_mcst.svg"), + ) + case IliasElementType.MEETING: + return TypeMatcher.any(TypeMatcher.img_src("_sess.svg")) + case IliasElementType.MOB_VIDEO: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO_FOLDER: + return TypeMatcher.never() + case IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: + return TypeMatcher.img_alt("opencast") + case IliasElementType.OPENCAST_VIDEO_PLAYER: + return TypeMatcher.never() + case IliasElementType.SCORM_LEARNING_MODULE: + return TypeMatcher.any( + TypeMatcher.query("baseclass=ilsahspresentationgui"), TypeMatcher.img_src("_sahs.svg") + ) + case IliasElementType.SURVEY: + return TypeMatcher.any(TypeMatcher.path("/svy/"), TypeMatcher.img_src("svy.svg")) + case IliasElementType.TEST: + return TypeMatcher.any( + TypeMatcher.query("cmdclass=ilobjtestgui"), + TypeMatcher.query("cmdclass=iltestscreengui"), + TypeMatcher.img_src("_tst.svg"), + ) + case IliasElementType.WIKI: + return TypeMatcher.any( + TypeMatcher.query("baseClass=ilwikihandlergui"), TypeMatcher.img_src("wiki.svg") + ) + + raise CrawlWarning(f"Unknown matcher {self}") @dataclass @@ -47,10 +253,25 @@ class IliasPageElement: def id(self) -> str: regexes = [ r"eid=(?P[0-9a-z\-]+)", - r"file_(?P\d+)", + r"book/(?P\d+)", # booking + r"cat/(?P\d+)", + r"copa/(?P\d+)", # content page + r"crs/(?P\d+)", # course + r"exc/(?P\d+)", # exercise + r"file/(?P\d+)", # file + r"fold/(?P\d+)", # folder + r"frm/(?P\d+)", # forum + r"grp/(?P\d+)", # group + r"lm/(?P\d+)", # learning module + r"mcst/(?P\d+)", # mediacast + r"pg/(?P(\d|_)+)", # page? + r"svy/(?P\d+)", # survey + r"sess/(?P\d+)", # session + r"webr/(?P\d+)", # web referene (link) + r"thr_pk=(?P\d+)", # forums r"ref_id=(?P\d+)", r"target=[a-z]+_(?P\d+)", - r"mm_(?P\d+)" + r"mm_(?P\d+)", ] for regex in regexes: @@ -61,18 +282,64 @@ class IliasPageElement: log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.") return self.url + @staticmethod + def create_new( + typ: IliasElementType, + url: str, + name: str, + mtime: Optional[datetime] = None, + description: Optional[str] = None, + skip_sanitize: bool = False, + ) -> "IliasPageElement": + if typ == IliasElementType.MEETING: + normalized = IliasPageElement._normalize_meeting_name(name) + log.explain(f"Normalized meeting name from {name!r} to {normalized!r}") + name = normalized + + if not skip_sanitize: + name = sanitize_path_name(name) + + return IliasPageElement(typ, url, name, mtime, description) + + @staticmethod + def _normalize_meeting_name(meeting_name: str) -> str: + """ + Normalizes meeting names, which have a relative time as their first part, + to their date in ISO format. + """ + + # This checks whether we can reach a `:` without passing a `-` + if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108 + # Meeting name only contains date: "05. Jan 2000:" + split_delimiter = ":" + else: + # Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:" + split_delimiter = ", " + + # We have a meeting day without time + date_portion_str = meeting_name.split(split_delimiter)[0] + date_portion = demangle_date(date_portion_str) + + # We failed to parse the date, bail out + if not date_portion: + return meeting_name + + # Replace the first section with the absolute date + rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:]) + return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name + @dataclass class IliasDownloadForumData: url: str - form_data: Dict[str, Union[str, List[str]]] + form_data: dict[str, str | list[str]] empty: bool @dataclass class IliasForumThread: - title: str - title_tag: Tag + name: str + name_tag: Tag content_tag: Tag mtime: Optional[datetime] @@ -85,21 +352,30 @@ class IliasLearningModulePage: previous_url: Optional[str] -class IliasPage: +class IliasSoup: + soup: BeautifulSoup + page_url: str - def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): - self._soup = soup - self._page_url = _page_url + def __init__(self, soup: BeautifulSoup, page_url: str): + self.soup = soup + self.page_url = page_url + + +class IliasPage: + def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]): + self._ilias_soup = ilias_soup + self._soup = ilias_soup.soup + self._page_url = ilias_soup.page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" @staticmethod - def is_root_page(soup: BeautifulSoup) -> bool: + def is_root_page(soup: IliasSoup) -> bool: if permalink := IliasPage.get_soup_permalink(soup): - return "goto.php?target=root_" in permalink + return "goto.php/root/" in permalink return False - def get_child_elements(self) -> List[IliasPageElement]: + def get_child_elements(self) -> list[IliasPageElement]: """ Return all child page elements you can find here. """ @@ -126,22 +402,25 @@ class IliasPage: def get_info_tab(self) -> Optional[IliasPageElement]: tab: Optional[Tag] = self._soup.find( - name="a", - attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x} + name="a", attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x} ) if tab is not None: - return IliasPageElement( - IliasElementType.INFO_TAB, - self._abs_url_from_link(tab), - "infos" + return IliasPageElement.create_new( + IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos" ) return None def get_description(self) -> Optional[BeautifulSoup]: - def is_interesting_class(name: str) -> bool: - return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] + def is_interesting_class(name: str | None) -> bool: + return name in [ + "ilCOPageSection", + "ilc_Paragraph", + "ilc_va_ihcap_VAccordIHeadCap", + "ilc_va_ihcap_AccordIHeadCap", + "ilc_media_cont_MediaContainer", + ] - paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class) + paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class)) if not paragraphs: return None @@ -152,6 +431,20 @@ class IliasPage: for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue + if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")): + # We have an embedded video which should be downloaded by _find_mob_videos + url, title = self._find_mob_video_url_title(video, p) + raw_html += '
External Video: {title}' + else: + raw_html += f"Video elided. Filename: '{title}'." + raw_html += "
\n" + continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: @@ -165,13 +458,13 @@ class IliasPage: def get_learning_module_data(self) -> Optional[IliasLearningModulePage]: if not self._is_learning_module_page(): return None - content = self._soup.select_one("#ilLMPageContent") - title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip() + content = cast(Tag, self._soup.select_one("#ilLMPageContent")) + title = cast(Tag, self._soup.select_one(".ilc_page_title_PageTitle")).get_text().strip() return IliasLearningModulePage( title=title, content=content, next_url=self._find_learning_module_next(), - previous_url=self._find_learning_module_prev() + previous_url=self._find_learning_module_prev(), ) def _find_learning_module_next(self) -> Optional[str]: @@ -190,29 +483,28 @@ class IliasPage: return url return None - def get_download_forum_data(self) -> Optional[IliasDownloadForumData]: - form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x}) - if not form: + def get_forum_export_url(self) -> Optional[str]: + forum_link = self._soup.select_one("#tab_forums_threads > a") + if not forum_link: + log.explain("Found no forum link") return None - post_url = self._abs_url_from_relative(form["action"]) - thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})] + base_url = self._abs_url_from_link(forum_link) + base_url = re.sub(r"cmd=\w+", "cmd=post", base_url) + base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url) - form_data: Dict[str, Union[str, List[str]]] = { - "thread_ids[]": thread_ids, - "selected_cmd2": "html", - "select_cmd2": "Ausführen", - "selected_cmd": "", - } + rtoken_form = self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x}) + if not rtoken_form: + log.explain("Found no rtoken anywhere") + return None + match = cast(re.Match[str], re.search(r"rtoken=(\w+)", str(rtoken_form.attrs["action"]))) + rtoken = match.group(1) - return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0) + base_url = base_url + "&rtoken=" + rtoken + + return base_url def get_next_stage_element(self) -> Optional[IliasPageElement]: - if self._is_forum_page(): - if "trows=800" in self._page_url: - return None - log.explain("Requesting *all* forum threads") - return self._get_show_max_forum_entries_per_page_url() if self._is_ilias_opencast_embedding(): log.explain("Unwrapping opencast embedding") return self.get_child_elements()[0] @@ -222,6 +514,8 @@ class IliasPage: if self._contains_collapsed_future_meetings(): log.explain("Requesting *all* future meetings") return self._uncollapse_future_meetings_url() + if self._is_exercise_not_all_shown(): + return self._show_all_exercises() if not self._is_content_tab_selected(): if self._page_type != IliasElementType.INFO_TAB: log.explain("Selecting content tab") @@ -230,13 +524,6 @@ class IliasPage: log.explain("Crawling info tab, skipping content select") return None - def _is_forum_page(self) -> bool: - read_more_btn = self._soup.find( - "button", - attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x} - ) - return read_more_btn is not None - def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) @@ -245,38 +532,36 @@ class IliasPage: return True # Raw listing without ILIAS fluff - video_element_table: Tag = self._soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) + video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) return video_element_table is not None def _is_ilias_opencast_embedding(self) -> bool: # ILIAS fluff around the real opencast html if self._soup.find(id="headerimage"): - element: Tag = self._soup.find(id="headerimage") - if "opencast" in element.attrs["src"].lower(): + element: Tag = cast(Tag, self._soup.find(id="headerimage")) + if "opencast" in cast(str, element.attrs["src"]).lower(): return True return False def _is_exercise_file(self) -> bool: # we know it from before - if self._page_type == IliasElementType.EXERCISE: + if self._page_type == IliasElementType.EXERCISE_OVERVIEW: return True # We have no suitable parent - let's guesss if self._soup.find(id="headerimage"): - element: Tag = self._soup.find(id="headerimage") - if "exc" in element.attrs["src"].lower(): + element: Tag = cast(Tag, self._soup.find(id="headerimage")) + if "exc" in cast(str, element.attrs["src"]).lower(): return True return False def _is_personal_desktop(self) -> bool: - return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) + return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower() def _is_content_page(self) -> bool: if link := self.get_permalink(): - return "target=copa_" in link + return "/copa/" in link return False def _is_learning_module_page(self) -> bool: @@ -290,12 +575,22 @@ class IliasPage: def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: element = self._soup.find( "a", - attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)} + attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}, ) if not element: return None link = self._abs_url_from_link(element) - return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings") + return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings") + + def _is_exercise_not_all_shown(self) -> bool: + return ( + self._page_type == IliasElementType.EXERCISE_OVERVIEW and "mode=all" not in self._page_url.lower() + ) + + def _show_all_exercises(self) -> Optional[IliasPageElement]: + return IliasPageElement.create_new( + IliasElementType.EXERCISE_OVERVIEW, self._page_url + "&mode=all", "show all exercises" + ) def _is_content_tab_selected(self) -> bool: return self._select_content_page_url() is None @@ -304,33 +599,33 @@ class IliasPage: might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None return self._page_type == IliasElementType.INFO_TAB and might_be_info + def _is_course_overview_page(self) -> bool: + return "baseClass=ilmembershipoverviewgui" in self._page_url + def _select_content_page_url(self) -> Optional[IliasPageElement]: tab = self._soup.find( - id="tab_view_content", - attrs={"class": lambda x: x is not None and "active" not in x} + id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x} ) # Already selected (or not found) if not tab: return None link = tab.find("a") if link: - link = self._abs_url_from_link(link) - return IliasPageElement(IliasElementType.FOLDER, link, "select content page") + link_str = self._abs_url_from_link(link) + return IliasPageElement.create_new(IliasElementType.FOLDER, link_str, "select content page") _unexpected_html_warning() log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.") log.warn_contd("PFERD might not find content on the course's main page.") return None - def _player_to_video(self) -> List[IliasPageElement]: + def _player_to_video(self) -> list[IliasPageElement]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source - regex = re.compile( - r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE - ) + regex = re.compile(r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE) json_match = regex.search(str(self._soup)) if json_match is None: @@ -345,95 +640,120 @@ class IliasPage: # and just fetch the lone video url! if len(streams) == 1: video_url = streams[0]["sources"]["mp4"][0]["src"] - return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)] + return [ + IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name) + ] log.explain(f"Found multiple videos for stream at {self._source_name}") items = [] for stream in sorted(streams, key=lambda stream: stream["content"]): full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4" video_url = stream["sources"]["mp4"][0]["src"] - items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) + items.append(IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) return items - def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]: + def _get_show_max_forum_entries_per_page_url( + self, wanted_max: Optional[int] = None + ) -> Optional[IliasPageElement]: correct_link = self._soup.find( - "a", - attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x} + "a", attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x} ) if not correct_link: return None link = self._abs_url_from_link(correct_link) + if wanted_max is not None: + link = link.replace("trows=800", f"trows={wanted_max}") - return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads") + return IliasPageElement.create_new(IliasElementType.FORUM, link, "show all forum threads") - def _find_personal_desktop_entries(self) -> List[IliasPageElement]: - items: List[IliasPageElement] = [] + def _get_forum_thread_count(self) -> Optional[int]: + log.explain_topic("Trying to find forum thread count") - titles: List[Tag] = self._soup.select(".il-item-title") + candidates = cast(list[Tag], self._soup.select(".ilTableFootLight")) + extract_regex = re.compile(r"\s(?P\d+)\s*\)") + + for candidate in candidates: + log.explain(f"Found thread count candidate: {candidate}") + if match := extract_regex.search(candidate.get_text()): + return int(match.group("max")) + else: + log.explain("Found no candidates to extract thread count from") + + return None + + def _find_personal_desktop_entries(self) -> list[IliasPageElement]: + items: list[IliasPageElement] = [] + + titles: list[Tag] = self._soup.select("#block_pditems_0 .il-item-title") for title in titles: link = title.find("a") - name = _sanitize_path_name(link.text.strip()) + + if not link: + log.explain(f"Skipping offline item: {title.get_text().strip()!r}") + continue + + name = sanitize_path_name(link.text.strip()) url = self._abs_url_from_link(link) if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url: # Configure button/link does not have anything interesting continue - type = self._find_type_from_link(name, link, url) - if not type: + typ = IliasPage._find_type_for_element( + name, url, lambda: IliasPage._find_icon_for_folder_entry(cast(Tag, link)) + ) + if not typ: _unexpected_html_warning() log.warn_contd(f"Could not extract type for {link}") continue - log.explain(f"Found {name!r}") + log.explain(f"Found {name!r} of type {typ}") - if type == IliasElementType.FILE and "_download" not in url: - url = re.sub(r"(target=file_\d+)", r"\1_download", url) - log.explain("Rewired file URL to include download part") - - items.append(IliasPageElement(type, url, name)) + items.append(IliasPageElement.create_new(typ, url, name)) return items - def _find_copa_entries(self) -> List[IliasPageElement]: - items: List[IliasPageElement] = [] - links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink") + def _find_copa_entries(self) -> list[IliasPageElement]: + items: list[IliasPageElement] = [] + links: list[Tag] = cast(list[Tag], self._soup.find_all(class_="ilc_flist_a_FileListItemLink")) for link in links: url = self._abs_url_from_link(link) - name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "") - name = _sanitize_path_name(name) + name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.get_text()).strip().replace("\t", "") + name = sanitize_path_name(name) if "file_id" not in url: _unexpected_html_warning() log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}") continue - items.append(IliasPageElement(IliasElementType.FILE, url, name)) + items.append(IliasPageElement.create_new(IliasElementType.FILE, url, name)) return items - def _find_info_tab_entries(self) -> List[IliasPageElement]: + def _find_info_tab_entries(self) -> list[IliasPageElement]: items = [] - links: List[Tag] = self._soup.select("a.il_ContainerItemCommand") + links: list[Tag] = self._soup.select("a.il_ContainerItemCommand") for link in links: - if "cmdClass=ilobjcoursegui" not in link["href"]: + log.explain(f"Found info tab link: {self._abs_url_from_link(link)}") + if "cmdclass=ilobjcoursegui" not in cast(str, link["href"]).lower(): continue - if "cmd=sendfile" not in link["href"]: + if "cmd=sendfile" not in cast(str, link["href"]).lower(): continue - items.append(IliasPageElement( - IliasElementType.FILE, - self._abs_url_from_link(link), - _sanitize_path_name(link.getText()) - )) + items.append( + IliasPageElement.create_new( + IliasElementType.FILE, self._abs_url_from_link(link), sanitize_path_name(link.get_text()) + ) + ) + log.explain(f"Found {len(items)} info tab entries {items}") return items - def _find_opencast_video_entries(self) -> List[IliasPageElement]: + def _find_opencast_video_entries(self) -> list[IliasPageElement]: # ILIAS has three stages for video pages # 1. The initial dummy page without any videos. This page contains the link to the listing # 2. The video listing which might be paginated @@ -441,59 +761,58 @@ class IliasPage: # # We need to figure out where we are. - video_element_table: Tag = self._soup.find( - name="table", id=re.compile(r"tbl_xoct_.+") - ) + video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if video_element_table is None: # We are in stage 1 # The page is actually emtpy but contains the link to stage 2 - content_link: Tag = self._soup.select_one("#tab_series a") + content_link: Tag = cast(Tag, self._soup.select_one("#tab_series a")) url: str = self._abs_url_from_link(content_link) query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(url, query_params) log.explain("Found ILIAS video frame page, fetching actual content next") - return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] + return [ + IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "") + ] is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None - if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: + if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_opencast_video_entries_paginated() return self._find_opencast_video_entries_no_paging() - def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]: - table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + def _find_opencast_video_entries_paginated(self) -> list[IliasPageElement]: + table_element = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if table_element is None: log.warn("Couldn't increase elements per page (table not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() - id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) + id_match = re.match(r"tbl_xoct_(.+)", cast(str, table_element.attrs["id"])) if id_match is None: log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() table_id = id_match.group(1) - query_params = {f"tbl_xoct_{table_id}_trows": "800", - "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(self._page_url, query_params) log.explain("Disabled pagination, retrying folder as a new entry") - return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] + return [IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] - def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: + def _find_opencast_video_entries_no_paging(self) -> list[IliasPageElement]: """ Crawls the "second stage" video page. This page contains the actual video urls. """ # Video start links are marked with an "Abspielen" link - video_links: List[Tag] = self._soup.findAll( - name="a", text=re.compile(r"\s*(Abspielen|Play)\s*") + video_links = cast( + list[Tag], self._soup.find_all(name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")) ) - results: List[IliasPageElement] = [] + results: list[IliasPageElement] = [] for link in video_links: results.append(self._listed_opencast_video_to_element(link)) @@ -505,12 +824,10 @@ class IliasPage: # 6th or 7th child (1 indexed) is the modification time string. Try to find it # by parsing backwards from the end and finding something that looks like a date modification_time = None - row: Tag = link.parent.parent.parent + row: Tag = link.parent.parent.parent # type: ignore column_count = len(row.select("td.std")) for index in range(column_count, 0, -1): - modification_string = link.parent.parent.parent.select_one( - f"td.std:nth-child({index})" - ).getText().strip() + modification_string = cast(Tag, row.select_one(f"td.std:nth-child({index})")).get_text().strip() if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string): modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M") break @@ -519,138 +836,166 @@ class IliasPage: log.warn(f"Could not determine upload time for {link}") modification_time = datetime.now() - title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() + title = cast(Tag, row.select_one("td.std:nth-child(3)")).get_text().strip() title += ".mp4" - video_name: str = _sanitize_path_name(title) + video_name: str = sanitize_path_name(title) video_url = self._abs_url_from_link(link) log.explain(f"Found video {video_name!r} at {video_url}") - return IliasPageElement( + return IliasPageElement.create_new( IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time ) - def _find_exercise_entries(self) -> List[IliasPageElement]: + def _find_exercise_entries(self) -> list[IliasPageElement]: if self._soup.find(id="tab_submission"): - log.explain("Found submission tab. This is an exercise detail page") - return self._find_exercise_entries_detail_page() + log.explain("Found submission tab. This is an exercise detail or files page") + if self._soup.select_one("#tab_submission.active") is None: + log.explain(" This is a details page") + return self._find_exercise_entries_detail_page() + else: + log.explain(" This is a files page") + return self._find_exercise_entries_files_page() + log.explain("Found no submission tab. This is an exercise root page") return self._find_exercise_entries_root_page() - def _find_exercise_entries_detail_page(self) -> List[IliasPageElement]: - results: List[IliasPageElement] = [] + def _find_exercise_entries_detail_page(self) -> list[IliasPageElement]: + results: list[IliasPageElement] = [] - # Find all download links in the container (this will contain all the files) - download_links: List[Tag] = self._soup.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmd=download" in x}, - text="Download" + if link := self._soup.select_one("#tab_submission > a"): + results.append( + IliasPageElement.create_new( + IliasElementType.EXERCISE_FILES, self._abs_url_from_link(link), "Submission" + ) + ) + else: + log.explain("Found no submission link for exercise, maybe it has not started yet?") + + # Find all download links in the container (this will contain all the *feedback* files) + download_links = cast( + list[Tag], + self._soup.find_all( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x is not None and "cmd=download" in x}, + text="Download", + ), ) for link in download_links: - parent_row: Tag = link.findParent("tr") - children: List[Tag] = parent_row.findChildren("td") + parent_row: Tag = cast( + Tag, link.find_parent(attrs={"class": lambda x: x is not None and "row" in x}) + ) + name_tag = parent_row.find(name="div") - name = _sanitize_path_name(children[1].getText().strip()) + if not name_tag: + log.warn("Could not find name tag for exercise entry") + _unexpected_html_warning() + continue + + name = sanitize_path_name(name_tag.get_text().strip()) log.explain(f"Found exercise detail entry {name!r}") + results.append( + IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name) + ) + + return results + + def _find_exercise_entries_files_page(self) -> list[IliasPageElement]: + results: list[IliasPageElement] = [] + + # Find all download links in the container + download_links = cast( + list[Tag], + self._soup.find_all( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x is not None and "cmd=download" in x}, + text="Download", + ), + ) + + for link in download_links: + parent_row: Tag = cast(Tag, link.find_parent("tr")) + children = cast(list[Tag], parent_row.find_all("td")) + + name = sanitize_path_name(children[1].get_text().strip()) + log.explain(f"Found exercise file entry {name!r}") + + date = None for child in reversed(children): - date = demangle_date(child.getText().strip(), fail_silently=True) + date = demangle_date(child.get_text().strip(), fail_silently=True) if date is not None: break if date is None: - log.warn(f"Date parsing failed for exercise entry {name!r}") + log.warn(f"Date parsing failed for exercise file entry {name!r}") - results.append(IliasPageElement( - IliasElementType.FILE, - self._abs_url_from_link(link), - name, - date - )) + results.append( + IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name, date) + ) return results - def _find_exercise_entries_root_page(self) -> List[IliasPageElement]: - results: List[IliasPageElement] = [] + def _find_exercise_entries_root_page(self) -> list[IliasPageElement]: + results: list[IliasPageElement] = [] - # Each assignment is in an accordion container - assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") + content_tab = self._soup.find(id="ilContentContainer") + if not content_tab: + log.warn("Could not find content tab in exercise overview page") + _unexpected_html_warning() + return [] - for container in assignment_containers: - # Fetch the container name out of the header to use it in the path - container_name = container.select_one(".ilAssignmentHeader").getText().strip() - log.explain(f"Found exercise container {container_name!r}") + exercise_links = content_tab.select(".il-item-title a") - # Find all download links in the container (this will contain all the files) - files: List[Tag] = container.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, - text="Download" + for exercise in cast(list[Tag], exercise_links): + if "href" not in exercise.attrs: + continue + href = exercise.attrs["href"] + if type(href) is not str: + continue + if "ass_id=" not in href or "cmdclass=ilassignmentpresentationgui" not in href.lower(): + continue + + name = sanitize_path_name(exercise.get_text().strip()) + results.append( + IliasPageElement.create_new( + IliasElementType.EXERCISE, self._abs_url_from_link(exercise), name + ) ) - # Grab each file as you now have the link - for file_link in files: - # Two divs, side by side. Left is the name, right is the link ==> get left - # sibling - file_name = file_link.parent.findPrevious(name="div").getText().strip() - file_name = _sanitize_path_name(file_name) - url = self._abs_url_from_link(file_link) - - log.explain(f"Found exercise entry {file_name!r}") - results.append(IliasPageElement( - IliasElementType.FILE, - url, - container_name + "/" + file_name, - None # We do not have any timestamp - )) - - # Find all links to file listings (e.g. "Submitted Files" for groups) - file_listings: List[Tag] = container.findAll( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()} - ) - - # Add each listing as a new - for listing in file_listings: - parent_container: Tag = listing.findParent( - "div", attrs={"class": lambda x: x and "form-group" in x} - ) - label_container: Tag = parent_container.find( - attrs={"class": lambda x: x and "control-label" in x} - ) - file_name = _sanitize_path_name(label_container.getText().strip()) - url = self._abs_url_from_link(listing) - log.explain(f"Found exercise detail {file_name!r} at {url}") - results.append(IliasPageElement( - IliasElementType.EXERCISE_FILES, - url, - container_name + "/" + file_name, - None # we do not have any timestamp - )) + for result in results: + log.explain(f"Found exercise {result.name!r}") return results - def _find_normal_entries(self) -> List[IliasPageElement]: - result: List[IliasPageElement] = [] + def _find_normal_entries(self) -> list[IliasPageElement]: + result: list[IliasPageElement] = [] + links: list[Tag] = [] # Fetch all links and throw them to the general interpreter - links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") + if self._is_course_overview_page(): + log.explain("Page is a course overview page, adjusting link selector") + links.extend(self._soup.select(".il-item-title > a")) + else: + links.extend(self._soup.select("a.il_ContainerItemTitle")) for link in links: abs_url = self._abs_url_from_link(link) - parents = self._find_upwards_folder_hierarchy(link) + # Make sure parents are sanitized. We do not want accidental parents + parents = [sanitize_path_name(x) for x in IliasPage._find_upwards_folder_hierarchy(link)] if parents: - element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) + element_name = "/".join(parents) + "/" + sanitize_path_name(link.get_text()) else: - element_name = _sanitize_path_name(link.getText()) + element_name = sanitize_path_name(link.get_text()) - element_type = self._find_type_from_link(element_name, link, abs_url) - description = self._find_link_description(link) + element_type = IliasPage._find_type_for_element( + element_name, abs_url, lambda: IliasPage._find_icon_for_folder_entry(link) + ) + description = IliasPage._find_link_description(link) # The last meeting on every page is expanded by default. # Its content is then shown inline *and* in the meeting page itself. @@ -660,59 +1005,113 @@ class IliasPage: if not element_type: continue - if element_type == IliasElementType.MEETING: - normalized = _sanitize_path_name(self._normalize_meeting_name(element_name)) - log.explain(f"Normalized meeting name from {element_name!r} to {normalized!r}") - element_name = normalized elif element_type == IliasElementType.FILE: - result.append(self._file_to_element(element_name, abs_url, link)) + result.append(IliasPage._file_to_element(element_name, abs_url, link)) continue - log.explain(f"Found {element_name!r}") - result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) + log.explain(f"Found {element_name!r} of type {element_type}") + result.append( + IliasPageElement.create_new( + element_type, abs_url, element_name, description=description, skip_sanitize=True + ) + ) result += self._find_cards() result += self._find_mediacast_videos() + result += self._find_mob_videos() return result - def _find_mediacast_videos(self) -> List[IliasPageElement]: - videos: List[IliasPageElement] = [] + def _find_mediacast_videos(self) -> list[IliasPageElement]: + videos: list[IliasPageElement] = [] - for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")): - element_name = _sanitize_path_name( - elem.select_one(".ilPlayerPreviewDescription").getText().strip() - ) - if not element_name.endswith(".mp4"): - # just to make sure it has some kinda-alrightish ending - element_name = element_name + ".mp4" - video_element = elem.find(name="video") - if not video_element: - _unexpected_html_warning() - log.warn_contd(f"No