diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs deleted file mode 100644 index 27246bf..0000000 --- a/.git-blame-ignore-revs +++ /dev/null @@ -1 +0,0 @@ -2cf0e060ed126537dd993896b6aa793e2a6b9e80 diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 3891848..0000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,10 +0,0 @@ -version: 2 -updates: - - package-ecosystem: github-actions - directory: / - schedule: - interval: monthly - groups: - gh-actions: - patterns: - - "*" diff --git a/.github/workflows/build-and-release.yml b/.github/workflows/build-and-release.yml index 9cd962f..83a36e4 100644 --- a/.github/workflows/build-and-release.yml +++ b/.github/workflows/build-and-release.yml @@ -1,6 +1,6 @@ name: build-and-release -on: [push, pull_request] +on: push defaults: run: @@ -13,26 +13,28 @@ jobs: strategy: fail-fast: false matrix: - os: [ubuntu-latest, windows-latest, macos-13, macos-latest] - python: ["3.11"] + os: [ubuntu-latest, windows-latest, macos-latest] + python: ["3.9"] steps: - - uses: actions/checkout@v4 - - name: Install uv - uses: astral-sh/setup-uv@v7 + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 with: python-version: ${{ matrix.python }} - name: Set up project - run: uv sync + if: matrix.os != 'windows-latest' + run: ./scripts/setup + + - name: Set up project on windows + if: matrix.os == 'windows-latest' + # For some reason, `pip install --upgrade pip` doesn't work on + # 'windows-latest'. The installed pip version works fine however. + run: ./scripts/setup --no-pip - name: Run checks - run: | - ./scripts/check - ./scripts/format - - - name: Assert no changes - run: git diff --exit-code + run: ./scripts/check - name: Build run: ./scripts/build @@ -43,9 +45,9 @@ jobs: run: mv dist/pferd* dist/pferd-${{ matrix.os }} - name: Upload binary - uses: actions/upload-artifact@v4 + uses: actions/upload-artifact@v3 with: - name: pferd-${{ matrix.os }} + name: Binaries path: dist/pferd-${{ matrix.os }} release: @@ -55,20 +57,18 @@ jobs: steps: - name: Download binaries - uses: actions/download-artifact@v4 + uses: actions/download-artifact@v3 with: - pattern: pferd-* - merge-multiple: true + name: Binaries - name: Rename binaries run: | mv pferd-ubuntu-latest pferd-linux mv pferd-windows-latest pferd-windows.exe - mv pferd-macos-13 pferd-mac-x86_64 mv pferd-macos-latest pferd-mac - name: Create release - uses: softprops/action-gh-release@v2 + uses: softprops/action-gh-release@v1 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} with: @@ -76,4 +76,3 @@ jobs: pferd-linux pferd-windows.exe pferd-mac - pferd-mac-x86_64 diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a2848c..a76508e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,101 +22,6 @@ ambiguous situations. ## Unreleased -## Added -- Store the description when using the `internet-shortcut` link format -- Support for basic auth with the kit-ipd crawler - -## Fixed -- Event loop errors on Windows with Python 3.14 -- Sanitize `/` in headings in kit-ipd crawler -- Crawl info tab again - -## 3.8.3 - 2025-07-01 - -## Added -- Support for link collections. - In "fancy" mode, a single HTML file with multiple links is generated. - In all other modes, PFERD creates a folder for the collection and a new file - for every link inside. - -## Fixed -- Crawling of exercises with instructions -- Don't download unavailable elements. - Elements that are unavailable (for example, because their availability is - time restricted) will not download the HTML for the info page anymore. -- `base_url` argument for `ilias-web` crawler causing crashes - -## 3.8.2 - 2025-04-29 - -## Changed -- Explicitly mention that wikis are not supported at the moment and ignore them - -## Fixed -- Ilias-native login -- Exercise crawling - -## 3.8.1 - 2025-04-17 - -## Fixed -- Description html files now specify at UTF-8 encoding -- Images in descriptions now always have a white background - -## 3.8.0 - 2025-04-16 - -### Added -- Support for ILIAS 9 - -### Changed -- Added prettier CSS to forum threads -- Downloaded forum threads now link to the forum instead of the ILIAS thread -- Increase minimum supported Python version to 3.11 -- Do not crawl nested courses (courses linked in other courses) - -## Fixed -- File links in report on Windows -- TOTP authentication in KIT Shibboleth -- Forum crawling only considering the first 20 entries - -## 3.7.0 - 2024-11-13 - -### Added -- Support for MOB videos in page descriptions -- Clickable links in the report to directly open new/modified/not-deleted files -- Support for non KIT shibboleth login - -### Changed -- Remove videos from description pages -- Perform ILIAS cycle detection after processing the transform to allow - ignoring duplicated elements -- Parse headings (h1-h3) as folders in kit-ipd crawler - -### Fixed -- Personal desktop/dashboard/favorites crawling -- Crawling of nested courses -- Downloading of links with no target URL -- Handle row flex on description pages -- Add `` heading to forum threads to fix mime type detection -- Handle groups in cards - -## 3.6.0 - 2024-10-23 - -### Added -- Generic `ilias-web` crawler and `ilias-web` CLI command -- Support for the course overview page. Using this URL as a target might cause - duplication warnings, as subgroups are listed separately. -- Support for named capture groups in regex transforms -- Crawl custom item groups as folders - -### Fixed -- Normalization of meeting names in cards -- Sanitization of slashes in exercise container names - -## 3.5.2 - 2024-04-14 - -### Fixed -- Crawling of personal desktop with ILIAS 8 -- Crawling of empty personal desktops - ## 3.5.1 - 2024-04-09 ### Added diff --git a/CONFIG.md b/CONFIG.md index b87f75c..5f62749 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -4,11 +4,11 @@ A config file consists of sections. A section begins with a `[section]` header, which is followed by a list of `key = value` pairs. Comments must be on their own line and start with `#`. Multiline values must be indented beyond their key. Boolean values can be `yes` or `no`. For more details and some examples on the -format, see the [configparser documentation][cp-file] -([interpolation][cp-interp] is disabled). +format, see the [configparser documentation][1] ([interpolation][2] is +disabled). -[cp-file]: "Supported INI File Structure" -[cp-interp]: "Interpolation of values" +[1]: "Supported INI File Structure" +[2]: "Interpolation of values" ## The `DEFAULT` section @@ -146,73 +146,13 @@ crawler simulate a slower, network-based crawler. This crawler crawls a KIT-IPD page by url. The root page can be crawled from outside the KIT network so you will be informed about any new/deleted files, -but downloading files requires you to be within. Adding a short delay between +but downloading files requires you to be within. Adding a show delay between requests is likely a good idea. - `target`: URL to a KIT-IPD page - `link_regex`: A regex that is matched against the `href` part of links. If it matches, the given link is downloaded as a file. This is used to extract files from KIT-IPD pages. (Default: `^.*?[^/]+\.(pdf|zip|c|cpp|java)$`) -- `auth`: Name of auth section to use for basic authentication. (Optional) - -### The `ilias-web` crawler - -This crawler crawls a generic ILIAS instance. - -Inspired by [this ILIAS downloader][ilias-dl], the following configurations should work -out of the box for the corresponding universities: - -[ilias-dl]: https://github.com/V3lop5/ilias-downloader/blob/main/configs "ilias-downloader configs" - -| University | `base_url` | `login_type` | `client_id` | -|-----------------|-----------------------------------------|--------------|---------------| -| FH Aachen | https://www.ili.fh-aachen.de | local | elearning | -| HHU Düsseldorf | https://ilias.hhu.de | local | UniRZ | -| Uni Köln | https://www.ilias.uni-koeln.de/ilias | local | uk | -| Uni Konstanz | https://ilias.uni-konstanz.de | local | ILIASKONSTANZ | -| Uni Stuttgart | https://ilias3.uni-stuttgart.de | local | Uni_Stuttgart | -| Uni Tübingen | https://ovidius.uni-tuebingen.de/ilias3 | shibboleth | | -| KIT ILIAS Pilot | https://pilot.ilias.studium.kit.edu | shibboleth | pilot | - -If your university isn't listed, try navigating to your instance's login page. -Assuming no custom login service is used, the URL will look something like this: - -```jinja -{{ base_url }}/login.php?client_id={{ client_id }}&cmd=force_login&lang= -``` - -If the values work, feel free to submit a PR and add them to the table above. - -- `base_url`: The URL where the ILIAS instance is located. (Required) -- `login_type`: How you authenticate. (Required) - - `local`: Use `client_id` for authentication. - - `shibboleth`: Use shibboleth for authentication. -- `client_id`: An ID used for authentication if `login_type` is `local`. Is - ignored if `login_type` is `shibboleth`. -- `target`: The ILIAS element to crawl. (Required) - - `desktop`: Crawl your personal desktop / dashboard - - ``: Crawl the course with the given id - - ``: Crawl a given element by URL (preferably the permanent URL linked - at the bottom of its ILIAS page). - This also supports the "My Courses" overview page to download *all* - courses. Note that this might produce confusing local directory layouts - and duplication warnings if you are a member of an ILIAS group. The - `desktop` target is generally preferable. -- `auth`: Name of auth section to use for login. (Required) -- `tfa_auth`: Name of auth section to use for two-factor authentication. Only - uses the auth section's password. (Default: Anonymous `tfa` authenticator) -- `links`: How to represent external links. (Default: `fancy`) - - `ignore`: Don't download links. - - `plaintext`: A text file containing only the URL. - - `fancy`: A HTML file looking like the ILIAS link element. - - `internet-shortcut`: An internet shortcut file (`.url` file). -- `link_redirect_delay`: Time (in seconds) until `fancy` link files will - redirect to the actual URL. Set to a negative value to disable the automatic - redirect. (Default: `-1`) -- `videos`: Whether to download videos. (Default: `no`) -- `forums`: Whether to download forum threads. (Default: `no`) -- `http_timeout`: The timeout (in seconds) for all HTTP requests. (Default: - `20.0`) ### The `kit-ilias-web` crawler @@ -292,10 +232,10 @@ is stored in the keyring. ### The `pass` authenticator -This authenticator queries the [`pass` password manager][pass] for a username -and password. It tries to be mostly compatible with [browserpass][browserpass] -and [passff][passff], so see those links for an overview of the format. If PFERD -fails to load your password, you can use the `--explain` flag to see why. +This authenticator queries the [`pass` password manager][3] for a username and +password. It tries to be mostly compatible with [browserpass][4] and +[passff][5], so see those links for an overview of the format. If PFERD fails +to load your password, you can use the `--explain` flag to see why. - `passname`: The name of the password to use (Required) - `username_prefixes`: A comma-separated list of username line prefixes @@ -303,9 +243,9 @@ fails to load your password, you can use the `--explain` flag to see why. - `password_prefixes`: A comma-separated list of password line prefixes (Default: `password,pass,secret`) -[pass]: "Pass: The Standard Unix Password Manager" -[browserpass]: "Organizing password store" -[passff]: "Multi-line format" +[3]: "Pass: The Standard Unix Password Manager" +[4]: "Organizing password store" +[5]: "Multi-line format" ### The `tfa` authenticator @@ -404,8 +344,7 @@ matches `SOURCE`, the output path is created using `TARGET` as template. be referred to as `{g}` (e.g. `{g3}`). `{g0}` refers to the original path. If capturing group *n*'s contents are a valid integer, the integer value is available as `{i}` (e.g. `{i3}`). If capturing group *n*'s contents are a -valid float, the float value is available as `{f}` (e.g. `{f3}`). Named capture -groups (e.g. `(?P)`) are available by their name (e.g. `{name}`). If a +valid float, the float value is available as `{f}` (e.g. `{f3}`). If a capturing group is not present (e.g. when matching the string `cd` with the regex `(ab)?cd`), the corresponding variables are not defined. diff --git a/DEV.md b/DEV.md index 8cc42c2..f577b93 100644 --- a/DEV.md +++ b/DEV.md @@ -9,25 +9,30 @@ particular [this][ppug-1] and [this][ppug-2] guide). ## Setting up a dev environment -The use of [venv][venv] and [uv][uv] is recommended. To initially set up a -development environment, run these commands in the same directory as this file: +The use of [venv][venv] is recommended. To initially set up a development +environment, run these commands in the same directory as this file: ``` -$ uv sync +$ python -m venv .venv $ . .venv/bin/activate +$ ./scripts/setup ``` -This install all required dependencies and tools. It also installs PFERD as -*editable*, which means that you can just run `pferd` as if it was installed -normally. Since PFERD was installed with `--editable`, there is no need to -re-run `uv sync` when the source code is changed. +The setup script installs a few required dependencies and tools. It also +installs PFERD via `pip install --editable .`, which means that you can just run +`pferd` as if it was installed normally. Since PFERD was installed with +`--editable`, there is no need to re-run `pip install` when the source code is +changed. + +If you get any errors because pip can't update itself, try running +`./scripts/setup --no-pip` instead of `./scripts/setup`. For more details, see [this part of the Python Tutorial][venv-tut] and [this section on "development mode"][ppug-dev]. [venv]: "venv - Creation of virtual environments" [venv-tut]: "12. Virtual Environments and Packages" -[uv]: "uv - An extremely fast Python package and project manager" +[ppug-dev]: "Working in “development mode”" ## Checking and formatting the code diff --git a/LICENSE b/LICENSE index ccccbe3..d81e827 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -Copyright 2019-2024 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, +Copyright 2019-2021 Garmelon, I-Al-Istannen, danstooamerican, pavelzw, TheChristophe, Scriptim, thelukasprobst, Toorero, - Mr-Pine, p-fruck, PinieP + Mr-Pine Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in diff --git a/PFERD/__main__.py b/PFERD/__main__.py index 2de9dbc..cb8c67c 100644 --- a/PFERD/__main__.py +++ b/PFERD/__main__.py @@ -133,8 +133,7 @@ def main() -> None: # https://bugs.python.org/issue39232 # https://github.com/encode/httpx/issues/914#issuecomment-780023632 # TODO Fix this properly - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = asyncio.get_event_loop() loop.run_until_complete(pferd.run(args.debug_transforms)) loop.run_until_complete(asyncio.sleep(1)) loop.close() diff --git a/PFERD/auth/__init__.py b/PFERD/auth/__init__.py index 7295c7a..aa3ba8e 100644 --- a/PFERD/auth/__init__.py +++ b/PFERD/auth/__init__.py @@ -1,5 +1,5 @@ -from collections.abc import Callable from configparser import SectionProxy +from typing import Callable, Dict from ..config import Config from .authenticator import Authenticator, AuthError, AuthLoadError, AuthSection # noqa: F401 @@ -9,19 +9,21 @@ from .pass_ import PassAuthenticator, PassAuthSection from .simple import SimpleAuthenticator, SimpleAuthSection from .tfa import TfaAuthenticator -AuthConstructor = Callable[ - [ - str, # Name (without the "auth:" prefix) - SectionProxy, # Authenticator's section of global config - Config, # Global config - ], - Authenticator, -] +AuthConstructor = Callable[[ + str, # Name (without the "auth:" prefix) + SectionProxy, # Authenticator's section of global config + Config, # Global config +], Authenticator] -AUTHENTICATORS: dict[str, AuthConstructor] = { - "credential-file": lambda n, s, c: CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), - "keyring": lambda n, s, c: KeyringAuthenticator(n, KeyringAuthSection(s)), - "pass": lambda n, s, c: PassAuthenticator(n, PassAuthSection(s)), - "simple": lambda n, s, c: SimpleAuthenticator(n, SimpleAuthSection(s)), - "tfa": lambda n, s, c: TfaAuthenticator(n), +AUTHENTICATORS: Dict[str, AuthConstructor] = { + "credential-file": lambda n, s, c: + CredentialFileAuthenticator(n, CredentialFileAuthSection(s), c), + "keyring": lambda n, s, c: + KeyringAuthenticator(n, KeyringAuthSection(s)), + "pass": lambda n, s, c: + PassAuthenticator(n, PassAuthSection(s)), + "simple": lambda n, s, c: + SimpleAuthenticator(n, SimpleAuthSection(s)), + "tfa": lambda n, s, c: + TfaAuthenticator(n), } diff --git a/PFERD/auth/authenticator.py b/PFERD/auth/authenticator.py index 417b7ba..643a2d5 100644 --- a/PFERD/auth/authenticator.py +++ b/PFERD/auth/authenticator.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Tuple from ..config import Section @@ -34,7 +35,7 @@ class Authenticator(ABC): self.name = name @abstractmethod - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: pass async def username(self) -> str: diff --git a/PFERD/auth/credential_file.py b/PFERD/auth/credential_file.py index cb7834c..94ffa73 100644 --- a/PFERD/auth/credential_file.py +++ b/PFERD/auth/credential_file.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Tuple from ..config import Config from ..utils import fmt_real_path @@ -22,9 +23,7 @@ class CredentialFileAuthenticator(Authenticator): with open(path, encoding="utf-8") as f: lines = list(f) except UnicodeDecodeError: - raise AuthLoadError( - f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8" - ) from None + raise AuthLoadError(f"Credential file at {fmt_real_path(path)} is not encoded using UTF-8") except OSError as e: raise AuthLoadError(f"No credential file at {fmt_real_path(path)}") from e @@ -43,5 +42,5 @@ class CredentialFileAuthenticator(Authenticator): self._username = uline[9:] self._password = pline[9:] - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: return self._username, self._password diff --git a/PFERD/auth/keyring.py b/PFERD/auth/keyring.py index 414640a..c14f6fb 100644 --- a/PFERD/auth/keyring.py +++ b/PFERD/auth/keyring.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Tuple import keyring @@ -17,6 +17,7 @@ class KeyringAuthSection(AuthSection): class KeyringAuthenticator(Authenticator): + def __init__(self, name: str, section: KeyringAuthSection) -> None: super().__init__(name) @@ -27,7 +28,7 @@ class KeyringAuthenticator(Authenticator): self._password_invalidated = False self._username_fixed = section.username() is not None - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: # Request the username if self._username is None: async with log.exclusive_output(): diff --git a/PFERD/auth/pass_.py b/PFERD/auth/pass_.py index c5d9b24..4c8e775 100644 --- a/PFERD/auth/pass_.py +++ b/PFERD/auth/pass_.py @@ -1,5 +1,6 @@ import re import subprocess +from typing import List, Tuple from ..logging import log from .authenticator import Authenticator, AuthError, AuthSection @@ -11,11 +12,11 @@ class PassAuthSection(AuthSection): self.missing_value("passname") return value - def username_prefixes(self) -> list[str]: + def username_prefixes(self) -> List[str]: value = self.s.get("username_prefixes", "login,username,user") return [prefix.lower() for prefix in value.split(",")] - def password_prefixes(self) -> list[str]: + def password_prefixes(self) -> List[str]: value = self.s.get("password_prefixes", "password,pass,secret") return [prefix.lower() for prefix in value.split(",")] @@ -30,14 +31,14 @@ class PassAuthenticator(Authenticator): self._username_prefixes = section.username_prefixes() self._password_prefixes = section.password_prefixes() - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: log.explain_topic("Obtaining credentials from pass") try: log.explain(f"Calling 'pass show {self._passname}'") result = subprocess.check_output(["pass", "show", self._passname], text=True) except subprocess.CalledProcessError as e: - raise AuthError(f"Failed to get password info from {self._passname}: {e}") from e + raise AuthError(f"Failed to get password info from {self._passname}: {e}") prefixed = {} unprefixed = [] diff --git a/PFERD/auth/simple.py b/PFERD/auth/simple.py index dea4b67..831c12f 100644 --- a/PFERD/auth/simple.py +++ b/PFERD/auth/simple.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Tuple from ..logging import log from ..utils import agetpass, ainput @@ -23,7 +23,7 @@ class SimpleAuthenticator(Authenticator): self._username_fixed = self.username is not None self._password_fixed = self.password is not None - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: if self._username is not None and self._password is not None: return self._username, self._password diff --git a/PFERD/auth/tfa.py b/PFERD/auth/tfa.py index 6ae48fe..26b1383 100644 --- a/PFERD/auth/tfa.py +++ b/PFERD/auth/tfa.py @@ -1,3 +1,5 @@ +from typing import Tuple + from ..logging import log from ..utils import ainput from .authenticator import Authenticator, AuthError @@ -15,7 +17,7 @@ class TfaAuthenticator(Authenticator): code = await ainput("TFA code: ") return code - async def credentials(self) -> tuple[str, str]: + async def credentials(self) -> Tuple[str, str]: raise AuthError("TFA authenticator does not support usernames") def invalidate_username(self) -> None: diff --git a/PFERD/cli/__init__.py b/PFERD/cli/__init__.py index c89f6f4..efa8f00 100644 --- a/PFERD/cli/__init__.py +++ b/PFERD/cli/__init__.py @@ -8,7 +8,6 @@ # well. from . import command_local # noqa: F401 imported but unused -from . import command_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused from . import command_kit_ipd # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused diff --git a/PFERD/cli/command_ilias_web.py b/PFERD/cli/command_ilias_web.py deleted file mode 100644 index b68e48f..0000000 --- a/PFERD/cli/command_ilias_web.py +++ /dev/null @@ -1,53 +0,0 @@ -import argparse -import configparser - -from ..logging import log -from .common_ilias_args import configure_common_group_args, load_common -from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler - -COMMAND_NAME = "ilias-web" - -SUBPARSER = SUBPARSERS.add_parser( - COMMAND_NAME, - parents=[CRAWLER_PARSER], -) - -GROUP = SUBPARSER.add_argument_group( - title=f"{COMMAND_NAME} crawler arguments", - description=f"arguments for the '{COMMAND_NAME}' crawler", -) - -GROUP.add_argument( - "--base-url", - type=str, - metavar="BASE_URL", - help="The base url of the ilias instance", -) - -GROUP.add_argument( - "--client-id", - type=str, - metavar="CLIENT_ID", - help="The client id of the ilias instance", -) - -configure_common_group_args(GROUP) - - -def load(args: argparse.Namespace, parser: configparser.ConfigParser) -> None: - log.explain(f"Creating config for command '{COMMAND_NAME}'") - - parser["crawl:ilias"] = {} - section = parser["crawl:ilias"] - load_crawler(args, section) - - section["type"] = COMMAND_NAME - if args.base_url is not None: - section["base_url"] = args.base_url - if args.client_id is not None: - section["client_id"] = args.client_id - - load_common(section, args, parser) - - -SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_kit_ilias_web.py b/PFERD/cli/command_kit_ilias_web.py index b3b45c5..de74fc3 100644 --- a/PFERD/cli/command_kit_ilias_web.py +++ b/PFERD/cli/command_kit_ilias_web.py @@ -1,37 +1,120 @@ import argparse import configparser +from pathlib import Path +from ..crawl.ilias.file_templates import Links from ..logging import log -from .common_ilias_args import configure_common_group_args, load_common -from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler - -COMMAND_NAME = "kit-ilias-web" +from .parser import (CRAWLER_PARSER, SUBPARSERS, BooleanOptionalAction, ParserLoadError, load_crawler, + show_value_error) SUBPARSER = SUBPARSERS.add_parser( - COMMAND_NAME, + "kit-ilias-web", parents=[CRAWLER_PARSER], ) GROUP = SUBPARSER.add_argument_group( - title=f"{COMMAND_NAME} crawler arguments", - description=f"arguments for the '{COMMAND_NAME}' crawler", + title="kit-ilias-web crawler arguments", + description="arguments for the 'kit-ilias-web' crawler", +) +GROUP.add_argument( + "target", + type=str, + metavar="TARGET", + help="course id, 'desktop', or ILIAS URL to crawl" +) +GROUP.add_argument( + "output", + type=Path, + metavar="OUTPUT", + help="output directory" +) +GROUP.add_argument( + "--username", "-u", + type=str, + metavar="USERNAME", + help="user name for authentication" +) +GROUP.add_argument( + "--keyring", + action=BooleanOptionalAction, + help="use the system keyring to store and retrieve passwords" +) +GROUP.add_argument( + "--credential-file", + type=Path, + metavar="PATH", + help="read username and password from a credential file" +) +GROUP.add_argument( + "--links", + type=show_value_error(Links.from_string), + metavar="OPTION", + help="how to represent external links" +) +GROUP.add_argument( + "--link-redirect-delay", + type=int, + metavar="SECONDS", + help="time before 'fancy' links redirect to to their target (-1 to disable)" +) +GROUP.add_argument( + "--videos", + action=BooleanOptionalAction, + help="crawl and download videos" +) +GROUP.add_argument( + "--forums", + action=BooleanOptionalAction, + help="crawl and download forum posts" +) +GROUP.add_argument( + "--http-timeout", "-t", + type=float, + metavar="SECONDS", + help="timeout for all HTTP requests" ) - -configure_common_group_args(GROUP) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: - log.explain(f"Creating config for command '{COMMAND_NAME}'") + log.explain("Creating config for command 'kit-ilias-web'") parser["crawl:ilias"] = {} section = parser["crawl:ilias"] load_crawler(args, section) - section["type"] = COMMAND_NAME - load_common(section, args, parser) + section["type"] = "kit-ilias-web" + section["target"] = str(args.target) + section["output_dir"] = str(args.output) + section["auth"] = "auth:ilias" + if args.links is not None: + section["links"] = str(args.links.value) + if args.link_redirect_delay is not None: + section["link_redirect_delay"] = str(args.link_redirect_delay) + if args.videos is not None: + section["videos"] = "yes" if args.videos else "no" + if args.forums is not None: + section["forums"] = "yes" if args.forums else "no" + if args.http_timeout is not None: + section["http_timeout"] = str(args.http_timeout) + + parser["auth:ilias"] = {} + auth_section = parser["auth:ilias"] + if args.credential_file is not None: + if args.username is not None: + raise ParserLoadError("--credential-file and --username can't be used together") + if args.keyring: + raise ParserLoadError("--credential-file and --keyring can't be used together") + auth_section["type"] = "credential-file" + auth_section["path"] = str(args.credential_file) + elif args.keyring: + auth_section["type"] = "keyring" + else: + auth_section["type"] = "simple" + if args.username is not None: + auth_section["username"] = args.username SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_kit_ipd.py b/PFERD/cli/command_kit_ipd.py index a80af03..b53e67e 100644 --- a/PFERD/cli/command_kit_ipd.py +++ b/PFERD/cli/command_kit_ipd.py @@ -18,30 +18,25 @@ GROUP.add_argument( "--link-regex", type=str, metavar="REGEX", - help="href-matching regex to identify downloadable files", -) -GROUP.add_argument( - "--basic-auth", - action="store_true", - help="enable basic authentication", + help="href-matching regex to identify downloadable files" ) GROUP.add_argument( "target", type=str, metavar="TARGET", - help="url to crawl", + help="url to crawl" ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory", + help="output directory" ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'kit-ipd'") @@ -55,11 +50,5 @@ def load( if args.link_regex: section["link_regex"] = str(args.link_regex) - if args.basic_auth: - section["auth"] = "auth:kit-ipd" - parser["auth:kit-ipd"] = {} - auth_section = parser["auth:kit-ipd"] - auth_section["type"] = "simple" - SUBPARSER.set_defaults(command=load) diff --git a/PFERD/cli/command_local.py b/PFERD/cli/command_local.py index 6016afa..309c42f 100644 --- a/PFERD/cli/command_local.py +++ b/PFERD/cli/command_local.py @@ -18,37 +18,37 @@ GROUP.add_argument( "target", type=Path, metavar="TARGET", - help="directory to crawl", + help="directory to crawl" ) GROUP.add_argument( "output", type=Path, metavar="OUTPUT", - help="output directory", + help="output directory" ) GROUP.add_argument( "--crawl-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for crawl requests", + help="artificial delay to simulate for crawl requests" ) GROUP.add_argument( "--download-delay", type=float, metavar="SECONDS", - help="artificial delay to simulate for download requests", + help="artificial delay to simulate for download requests" ) GROUP.add_argument( "--download-speed", type=int, metavar="BYTES_PER_SECOND", - help="download speed to simulate", + help="download speed to simulate" ) def load( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: log.explain("Creating config for command 'local'") diff --git a/PFERD/cli/common_ilias_args.py b/PFERD/cli/common_ilias_args.py deleted file mode 100644 index edad6da..0000000 --- a/PFERD/cli/common_ilias_args.py +++ /dev/null @@ -1,106 +0,0 @@ -import argparse -import configparser -from pathlib import Path - -from ..crawl.ilias.file_templates import Links -from .parser import BooleanOptionalAction, ParserLoadError, show_value_error - - -def configure_common_group_args(group: argparse._ArgumentGroup) -> None: - """These arguments are shared between the KIT and generic Ilias web command.""" - group.add_argument( - "target", - type=str, - metavar="TARGET", - help="course id, 'desktop', or ILIAS URL to crawl", - ) - group.add_argument( - "output", - type=Path, - metavar="OUTPUT", - help="output directory", - ) - group.add_argument( - "--username", - "-u", - type=str, - metavar="USERNAME", - help="user name for authentication", - ) - group.add_argument( - "--keyring", - action=BooleanOptionalAction, - help="use the system keyring to store and retrieve passwords", - ) - group.add_argument( - "--credential-file", - type=Path, - metavar="PATH", - help="read username and password from a credential file", - ) - group.add_argument( - "--links", - type=show_value_error(Links.from_string), - metavar="OPTION", - help="how to represent external links", - ) - group.add_argument( - "--link-redirect-delay", - type=int, - metavar="SECONDS", - help="time before 'fancy' links redirect to to their target (-1 to disable)", - ) - group.add_argument( - "--videos", - action=BooleanOptionalAction, - help="crawl and download videos", - ) - group.add_argument( - "--forums", - action=BooleanOptionalAction, - help="crawl and download forum posts", - ) - group.add_argument( - "--http-timeout", - "-t", - type=float, - metavar="SECONDS", - help="timeout for all HTTP requests", - ) - - -def load_common( - section: configparser.SectionProxy, - args: argparse.Namespace, - parser: configparser.ConfigParser, -) -> None: - """Load common config between generic and KIT ilias web command""" - section["target"] = str(args.target) - section["output_dir"] = str(args.output) - section["auth"] = "auth:ilias" - if args.links is not None: - section["links"] = str(args.links.value) - if args.link_redirect_delay is not None: - section["link_redirect_delay"] = str(args.link_redirect_delay) - if args.videos is not None: - section["videos"] = "yes" if args.videos else "no" - if args.forums is not None: - section["forums"] = "yes" if args.forums else "no" - if args.http_timeout is not None: - section["http_timeout"] = str(args.http_timeout) - - parser["auth:ilias"] = {} - auth_section = parser["auth:ilias"] - if args.credential_file is not None: - if args.username is not None: - raise ParserLoadError("--credential-file and --username can't be used together") - if args.keyring: - raise ParserLoadError("--credential-file and --keyring can't be used together") - auth_section["type"] = "credential-file" - auth_section["path"] = str(args.credential_file) - elif args.keyring: - auth_section["type"] = "keyring" - else: - auth_section["type"] = "simple" - if args.username is not None: - auth_section["username"] = args.username diff --git a/PFERD/cli/parser.py b/PFERD/cli/parser.py index c9bec13..be483fd 100644 --- a/PFERD/cli/parser.py +++ b/PFERD/cli/parser.py @@ -1,9 +1,8 @@ import argparse import configparser from argparse import ArgumentTypeError -from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Optional +from typing import Any, Callable, List, Optional, Sequence, Union from ..output_dir import OnConflict, Redownload from ..version import NAME, VERSION @@ -16,15 +15,15 @@ class ParserLoadError(Exception): # TODO Replace with argparse version when updating to 3.9? class BooleanOptionalAction(argparse.Action): def __init__( - self, - option_strings: list[str], - dest: Any, - default: Any = None, - type: Any = None, - choices: Any = None, - required: Any = False, - help: Any = None, - metavar: Any = None, + self, + option_strings: List[str], + dest: Any, + default: Any = None, + type: Any = None, + choices: Any = None, + required: Any = False, + help: Any = None, + metavar: Any = None, ): if len(option_strings) != 1: raise ValueError("There must be exactly one option string") @@ -49,11 +48,11 @@ class BooleanOptionalAction(argparse.Action): ) def __call__( - self, - parser: argparse.ArgumentParser, - namespace: argparse.Namespace, - values: str | Sequence[Any] | None, - option_string: Optional[str] = None, + self, + parser: argparse.ArgumentParser, + namespace: argparse.Namespace, + values: Union[str, Sequence[Any], None], + option_string: Optional[str] = None, ) -> None: if option_string and option_string in self.option_strings: value = not option_string.startswith("--no-") @@ -68,13 +67,11 @@ def show_value_error(inner: Callable[[str], Any]) -> Callable[[str], Any]: Some validation functions (like the from_string in our enums) raise a ValueError. Argparse only pretty-prints ArgumentTypeErrors though, so we need to wrap our ValueErrors. """ - def wrapper(input: str) -> Any: try: return inner(input) except ValueError as e: - raise ArgumentTypeError(e) from e - + raise ArgumentTypeError(e) return wrapper @@ -84,57 +81,52 @@ CRAWLER_PARSER_GROUP = CRAWLER_PARSER.add_argument_group( description="arguments common to all crawlers", ) CRAWLER_PARSER_GROUP.add_argument( - "--redownload", - "-r", + "--redownload", "-r", type=show_value_error(Redownload.from_string), metavar="OPTION", - help="when to download a file that's already present locally", + help="when to download a file that's already present locally" ) CRAWLER_PARSER_GROUP.add_argument( "--on-conflict", type=show_value_error(OnConflict.from_string), metavar="OPTION", - help="what to do when local and remote files or directories differ", + help="what to do when local and remote files or directories differ" ) CRAWLER_PARSER_GROUP.add_argument( - "--transform", - "-T", + "--transform", "-T", action="append", type=str, metavar="RULE", - help="add a single transformation rule. Can be specified multiple times", + help="add a single transformation rule. Can be specified multiple times" ) CRAWLER_PARSER_GROUP.add_argument( - "--tasks", - "-n", + "--tasks", "-n", type=int, metavar="N", - help="maximum number of concurrent tasks (crawling, downloading)", + help="maximum number of concurrent tasks (crawling, downloading)" ) CRAWLER_PARSER_GROUP.add_argument( - "--downloads", - "-N", + "--downloads", "-N", type=int, metavar="N", - help="maximum number of tasks that may download data at the same time", + help="maximum number of tasks that may download data at the same time" ) CRAWLER_PARSER_GROUP.add_argument( - "--task-delay", - "-d", + "--task-delay", "-d", type=float, metavar="SECONDS", - help="time the crawler should wait between subsequent tasks", + help="time the crawler should wait between subsequent tasks" ) CRAWLER_PARSER_GROUP.add_argument( "--windows-paths", action=BooleanOptionalAction, - help="whether to repair invalid paths on windows", + help="whether to repair invalid paths on windows" ) def load_crawler( - args: argparse.Namespace, - section: configparser.SectionProxy, + args: argparse.Namespace, + section: configparser.SectionProxy, ) -> None: if args.redownload is not None: section["redownload"] = args.redownload.value @@ -160,79 +152,79 @@ PARSER.add_argument( version=f"{NAME} {VERSION} (https://github.com/Garmelon/PFERD)", ) PARSER.add_argument( - "--config", - "-c", + "--config", "-c", type=Path, metavar="PATH", - help="custom config file", + help="custom config file" ) PARSER.add_argument( "--dump-config", action="store_true", - help="dump current configuration to the default config path and exit", + help="dump current configuration to the default config path and exit" ) PARSER.add_argument( "--dump-config-to", metavar="PATH", - help="dump current configuration to a file and exit. Use '-' as path to print to stdout instead", + help="dump current configuration to a file and exit." + " Use '-' as path to print to stdout instead" ) PARSER.add_argument( "--debug-transforms", action="store_true", - help="apply transform rules to files of previous run", + help="apply transform rules to files of previous run" ) PARSER.add_argument( - "--crawler", - "-C", + "--crawler", "-C", action="append", type=str, metavar="NAME", - help="only execute a single crawler. Can be specified multiple times to execute multiple crawlers", + help="only execute a single crawler." + " Can be specified multiple times to execute multiple crawlers" ) PARSER.add_argument( - "--skip", - "-S", + "--skip", "-S", action="append", type=str, metavar="NAME", - help="don't execute this particular crawler. Can be specified multiple times to skip multiple crawlers", + help="don't execute this particular crawler." + " Can be specified multiple times to skip multiple crawlers" ) PARSER.add_argument( "--working-dir", type=Path, metavar="PATH", - help="custom working directory", + help="custom working directory" ) PARSER.add_argument( "--explain", action=BooleanOptionalAction, - help="log and explain in detail what PFERD is doing", + help="log and explain in detail what PFERD is doing" ) PARSER.add_argument( "--status", action=BooleanOptionalAction, - help="print status updates while PFERD is crawling", + help="print status updates while PFERD is crawling" ) PARSER.add_argument( "--report", action=BooleanOptionalAction, - help="print a report of all local changes before exiting", + help="print a report of all local changes before exiting" ) PARSER.add_argument( "--share-cookies", action=BooleanOptionalAction, - help="whether crawlers should share cookies where applicable", + help="whether crawlers should share cookies where applicable" ) PARSER.add_argument( "--show-not-deleted", action=BooleanOptionalAction, - help="print messages in status and report when PFERD did not delete a local only file", + help="print messages in status and report when PFERD did not delete a local only file" ) def load_default_section( - args: argparse.Namespace, - parser: configparser.ConfigParser, + args: argparse.Namespace, + parser: configparser.ConfigParser, ) -> None: section = parser[parser.default_section] diff --git a/PFERD/config.py b/PFERD/config.py index 7da2889..b2cff4e 100644 --- a/PFERD/config.py +++ b/PFERD/config.py @@ -3,7 +3,7 @@ import os import sys from configparser import ConfigParser, SectionProxy from pathlib import Path -from typing import Any, NoReturn, Optional +from typing import Any, List, NoReturn, Optional, Tuple from rich.markup import escape @@ -53,10 +53,10 @@ class Section: raise ConfigOptionError(self.s.name, key, desc) def invalid_value( - self, - key: str, - value: Any, - reason: Optional[str], + self, + key: str, + value: Any, + reason: Optional[str], ) -> NoReturn: if reason is None: self.error(key, f"Invalid value {value!r}") @@ -126,13 +126,13 @@ class Config: with open(path, encoding="utf-8") as f: parser.read_file(f, source=str(path)) except FileNotFoundError: - raise ConfigLoadError(path, "File does not exist") from None + raise ConfigLoadError(path, "File does not exist") except IsADirectoryError: - raise ConfigLoadError(path, "That's a directory, not a file") from None + raise ConfigLoadError(path, "That's a directory, not a file") except PermissionError: - raise ConfigLoadError(path, "Insufficient permissions") from None + raise ConfigLoadError(path, "Insufficient permissions") except UnicodeDecodeError: - raise ConfigLoadError(path, "File is not encoded using UTF-8") from None + raise ConfigLoadError(path, "File is not encoded using UTF-8") def dump(self, path: Optional[Path] = None) -> None: """ @@ -150,8 +150,8 @@ class Config: try: path.parent.mkdir(parents=True, exist_ok=True) - except PermissionError as e: - raise ConfigDumpError(path, "Could not create parent directory") from e + except PermissionError: + raise ConfigDumpError(path, "Could not create parent directory") try: # Ensuring we don't accidentally overwrite any existing files by @@ -167,16 +167,16 @@ class Config: with open(path, "w", encoding="utf-8") as f: self._parser.write(f) else: - raise ConfigDumpError(path, "File already exists") from None + raise ConfigDumpError(path, "File already exists") except IsADirectoryError: - raise ConfigDumpError(path, "That's a directory, not a file") from None - except PermissionError as e: - raise ConfigDumpError(path, "Insufficient permissions") from e + raise ConfigDumpError(path, "That's a directory, not a file") + except PermissionError: + raise ConfigDumpError(path, "Insufficient permissions") def dump_to_stdout(self) -> None: self._parser.write(sys.stdout) - def crawl_sections(self) -> list[tuple[str, SectionProxy]]: + def crawl_sections(self) -> List[Tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("crawl:"): @@ -184,7 +184,7 @@ class Config: return result - def auth_sections(self) -> list[tuple[str, SectionProxy]]: + def auth_sections(self) -> List[Tuple[str, SectionProxy]]: result = [] for name, proxy in self._parser.items(): if name.startswith("auth:"): diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 9ba6a37..1f8bd59 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -1,26 +1,25 @@ -from collections.abc import Callable from configparser import SectionProxy +from typing import Callable, Dict from ..auth import Authenticator from ..config import Config from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 -from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection +from .ilias import KitIliasWebCrawler, KitIliasWebCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection -CrawlerConstructor = Callable[ - [ - str, # Name (without the "crawl:" prefix) - SectionProxy, # Crawler's section of global config - Config, # Global config - dict[str, Authenticator], # Loaded authenticators by name - ], - Crawler, -] +CrawlerConstructor = Callable[[ + str, # Name (without the "crawl:" prefix) + SectionProxy, # Crawler's section of global config + Config, # Global config + Dict[str, Authenticator], # Loaded authenticators by name +], Crawler] -CRAWLERS: dict[str, CrawlerConstructor] = { - "local": lambda n, s, c, a: LocalCrawler(n, LocalCrawlerSection(s), c), - "ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), - "kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), - "kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c, a), +CRAWLERS: Dict[str, CrawlerConstructor] = { + "local": lambda n, s, c, a: + LocalCrawler(n, LocalCrawlerSection(s), c), + "kit-ilias-web": lambda n, s, c, a: + KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), + "kit-ipd": lambda n, s, c, a: + KitIpdCrawler(n, KitIpdCrawlerSection(s), c), } diff --git a/PFERD/crawl/crawler.py b/PFERD/crawl/crawler.py index e2cdf30..0e67c02 100644 --- a/PFERD/crawl/crawler.py +++ b/PFERD/crawl/crawler.py @@ -1,10 +1,10 @@ import asyncio import os from abc import ABC, abstractmethod -from collections.abc import Awaitable, Callable, Coroutine, Sequence +from collections.abc import Awaitable, Coroutine from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Optional, TypeVar +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, TypeVar from ..auth import Authenticator from ..config import Config, Section @@ -116,7 +116,7 @@ class CrawlToken(ReusableAsyncContextManager[ProgressBar]): return bar -class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): +class DownloadToken(ReusableAsyncContextManager[Tuple[ProgressBar, FileSink]]): def __init__(self, limiter: Limiter, fs_token: FileSinkToken, path: PurePath): super().__init__() @@ -128,13 +128,12 @@ class DownloadToken(ReusableAsyncContextManager[tuple[ProgressBar, FileSink]]): def path(self) -> PurePath: return self._path - async def _on_aenter(self) -> tuple[ProgressBar, FileSink]: + async def _on_aenter(self) -> Tuple[ProgressBar, FileSink]: await self._stack.enter_async_context(self._limiter.limit_download()) sink = await self._stack.enter_async_context(self._fs_token) # The "Downloaded ..." message is printed in the output dir, not here - bar = self._stack.enter_context( - log.download_bar("[bold bright_cyan]", "Downloading", fmt_path(self._path)) - ) + bar = self._stack.enter_context(log.download_bar("[bold bright_cyan]", "Downloading", + fmt_path(self._path))) return bar, sink @@ -150,7 +149,9 @@ class CrawlerSection(Section): return self.s.getboolean("skip", fallback=False) def output_dir(self, name: str) -> Path: - name = name.removeprefix("crawl:") + # TODO Use removeprefix() after switching to 3.9 + if name.startswith("crawl:"): + name = name[len("crawl:"):] return Path(self.s.get("output_dir", name)).expanduser() def redownload(self) -> Redownload: @@ -205,7 +206,7 @@ class CrawlerSection(Section): on_windows = os.name == "nt" return self.s.getboolean("windows_paths", fallback=on_windows) - def auth(self, authenticators: dict[str, Authenticator]) -> Authenticator: + def auth(self, authenticators: Dict[str, Authenticator]) -> Authenticator: value = self.s.get("auth") if value is None: self.missing_value("auth") @@ -217,10 +218,10 @@ class CrawlerSection(Section): class Crawler(ABC): def __init__( - self, - name: str, - section: CrawlerSection, - config: Config, + self, + name: str, + section: CrawlerSection, + config: Config, ) -> None: """ Initialize a crawler from its name and its section in the config file. @@ -257,12 +258,8 @@ class Crawler(ABC): def prev_report(self) -> Optional[Report]: return self._output_dir.prev_report - @property - def output_dir(self) -> OutputDirectory: - return self._output_dir - @staticmethod - async def gather(awaitables: Sequence[Awaitable[Any]]) -> list[Any]: + async def gather(awaitables: Sequence[Awaitable[Any]]) -> List[Any]: """ Similar to asyncio.gather. However, in the case of an exception, all still running tasks are cancelled and the exception is rethrown. @@ -293,39 +290,12 @@ class Crawler(ABC): log.explain("Answer: Yes") return CrawlToken(self._limiter, path) - def should_try_download( - self, - path: PurePath, - *, - etag_differs: Optional[bool] = None, - mtime: Optional[datetime] = None, - redownload: Optional[Redownload] = None, - on_conflict: Optional[OnConflict] = None, - ) -> bool: - log.explain_topic(f"Decision: Should Download {fmt_path(path)}") - - if self._transformer.transform(path) is None: - log.explain("Answer: No (ignored)") - return False - - should_download = self._output_dir.should_try_download( - path, etag_differs=etag_differs, mtime=mtime, redownload=redownload, on_conflict=on_conflict - ) - if should_download: - log.explain("Answer: Yes") - return True - else: - log.explain("Answer: No") - return False - async def download( - self, - path: PurePath, - *, - etag_differs: Optional[bool] = None, - mtime: Optional[datetime] = None, - redownload: Optional[Redownload] = None, - on_conflict: Optional[OnConflict] = None, + self, + path: PurePath, + mtime: Optional[datetime] = None, + redownload: Optional[Redownload] = None, + on_conflict: Optional[OnConflict] = None, ) -> Optional[DownloadToken]: log.explain_topic(f"Decision: Download {fmt_path(path)}") path = self._deduplicator.mark(path) @@ -337,14 +307,7 @@ class Crawler(ABC): log.status("[bold bright_black]", "Ignored", fmt_path(path)) return None - fs_token = await self._output_dir.download( - path, - transformed_path, - etag_differs=etag_differs, - mtime=mtime, - redownload=redownload, - on_conflict=on_conflict, - ) + fs_token = await self._output_dir.download(path, transformed_path, mtime, redownload, on_conflict) if fs_token is None: log.explain("Answer: No") return None @@ -394,7 +357,7 @@ class Crawler(ABC): log.warn("Couldn't find or load old report") return - seen: set[PurePath] = set() + seen: Set[PurePath] = set() for known in sorted(self.prev_report.found_paths): looking_at = list(reversed(known.parents)) + [known] for path in looking_at: diff --git a/PFERD/crawl/http_crawler.py b/PFERD/crawl/http_crawler.py index 49d6013..44ec4dd 100644 --- a/PFERD/crawl/http_crawler.py +++ b/PFERD/crawl/http_crawler.py @@ -1,39 +1,35 @@ import asyncio import http.cookies import ssl -from datetime import datetime from pathlib import Path, PurePath -from typing import Any, Optional +from typing import Any, Dict, List, Optional import aiohttp import certifi from aiohttp.client import ClientTimeout -from bs4 import Tag from ..auth import Authenticator from ..config import Config from ..logging import log -from ..utils import fmt_real_path, sanitize_path_name +from ..utils import fmt_real_path from ..version import NAME, VERSION from .crawler import Crawler, CrawlerSection -ETAGS_CUSTOM_REPORT_VALUE_KEY = "etags" - class HttpCrawlerSection(CrawlerSection): def http_timeout(self) -> float: - return self.s.getfloat("http_timeout", fallback=30) + return self.s.getfloat("http_timeout", fallback=20) class HttpCrawler(Crawler): COOKIE_FILE = PurePath(".cookies") def __init__( - self, - name: str, - section: HttpCrawlerSection, - config: Config, - shared_auth: Optional[Authenticator] = None, + self, + name: str, + section: HttpCrawlerSection, + config: Config, + shared_auth: Optional[Authenticator] = None, ) -> None: super().__init__(name, section, config) @@ -43,7 +39,7 @@ class HttpCrawler(Crawler): self._http_timeout = section.http_timeout() self._cookie_jar_path = self._output_dir.resolve(self.COOKIE_FILE) - self._shared_cookie_jar_paths: Optional[list[Path]] = None + self._shared_cookie_jar_paths: Optional[List[Path]] = None self._shared_auth = shared_auth self._output_dir.register_reserved(self.COOKIE_FILE) @@ -98,7 +94,7 @@ class HttpCrawler(Crawler): """ raise RuntimeError("_authenticate() was called but crawler doesn't provide an implementation") - def share_cookies(self, shared: dict[Authenticator, list[Path]]) -> None: + def share_cookies(self, shared: Dict[Authenticator, List[Path]]) -> None: if not self._shared_auth: return @@ -173,102 +169,24 @@ class HttpCrawler(Crawler): log.warn(f"Failed to save cookies to {fmt_real_path(self._cookie_jar_path)}") log.warn(str(e)) - @staticmethod - def get_folder_structure_from_heading_hierarchy(file_link: Tag, drop_h1: bool = False) -> PurePath: - """ - Retrieves the hierarchy of headings associated with the give file link and constructs a folder - structure from them. - -

level headings usually only appear once and serve as the page title, so they would introduce - redundant nesting. To avoid this,

headings are ignored via the drop_h1 parameter. - """ - - def find_associated_headings(tag: Tag, level: int) -> PurePath: - if level == 0 or (level == 1 and drop_h1): - return PurePath() - - level_heading = tag.find_previous(name=f"h{level}") - - if level_heading is None: - return find_associated_headings(tag, level - 1) - - folder_name = sanitize_path_name(level_heading.get_text().strip()) - return find_associated_headings(level_heading, level - 1) / folder_name - - # start at level

because paragraph-level headings are usually too granular for folder names - return find_associated_headings(file_link, 3) - - def _get_previous_etag_from_report(self, path: PurePath) -> Optional[str]: - """ - If available, retrieves the entity tag for a given path which was stored in the previous report. - """ - if not self._output_dir.prev_report: - return None - - etags = self._output_dir.prev_report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {} - return etags.get(str(path)) - - def _add_etag_to_report(self, path: PurePath, etag: Optional[str]) -> None: - """ - Adds an entity tag for a given path to the report's custom values. - """ - if not etag: - return - - etags = self._output_dir.report.get_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY) or {} - etags[str(path)] = etag - self._output_dir.report.add_custom_value(ETAGS_CUSTOM_REPORT_VALUE_KEY, etags) - - async def _request_resource_version(self, resource_url: str) -> tuple[Optional[str], Optional[datetime]]: - """ - Requests the ETag and Last-Modified headers of a resource via a HEAD request. - If no entity tag / modification date can be obtained, the according value will be None. - """ - try: - async with self.session.head(resource_url) as resp: - if resp.status != 200: - return None, None - - etag_header = resp.headers.get("ETag") - last_modified_header = resp.headers.get("Last-Modified") - last_modified = None - - if last_modified_header: - try: - # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#directives - datetime_format = "%a, %d %b %Y %H:%M:%S GMT" - last_modified = datetime.strptime(last_modified_header, datetime_format) - except ValueError: - # last_modified remains None - pass - - return etag_header, last_modified - except aiohttp.ClientError: - return None, None - async def run(self) -> None: self._request_count = 0 self._cookie_jar = aiohttp.CookieJar() self._load_cookies() async with aiohttp.ClientSession( - headers={"User-Agent": f"{NAME}/{VERSION}"}, - cookie_jar=self._cookie_jar, - connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), - timeout=ClientTimeout( - # 30 minutes. No download in the history of downloads was longer than 30 minutes. - # This is enough to transfer a 600 MB file over a 3 Mib/s connection. - # Allowing an arbitrary value could be annoying for overnight batch jobs - total=15 * 60, - connect=self._http_timeout, - sock_connect=self._http_timeout, - sock_read=self._http_timeout, - ), - # See https://github.com/aio-libs/aiohttp/issues/6626 - # Without this aiohttp will mangle the redirect header from Shibboleth, invalidating the - # passed signature. Shibboleth will not accept the broken signature and authentication will - # fail. - requote_redirect_url=False, + headers={"User-Agent": f"{NAME}/{VERSION}"}, + cookie_jar=self._cookie_jar, + connector=aiohttp.TCPConnector(ssl=ssl.create_default_context(cafile=certifi.where())), + timeout=ClientTimeout( + # 30 minutes. No download in the history of downloads was longer than 30 minutes. + # This is enough to transfer a 600 MB file over a 3 Mib/s connection. + # Allowing an arbitrary value could be annoying for overnight batch jobs + total=15 * 60, + connect=self._http_timeout, + sock_connect=self._http_timeout, + sock_read=self._http_timeout, + ) ) as session: self.session = session try: diff --git a/PFERD/crawl/ilias/__init__.py b/PFERD/crawl/ilias/__init__.py index fa1aaed..26618a8 100644 --- a/PFERD/crawl/ilias/__init__.py +++ b/PFERD/crawl/ilias/__init__.py @@ -1,13 +1,3 @@ -from .kit_ilias_web_crawler import ( - IliasWebCrawler, - IliasWebCrawlerSection, - KitIliasWebCrawler, - KitIliasWebCrawlerSection, -) +from .kit_ilias_web_crawler import KitIliasWebCrawler, KitIliasWebCrawlerSection -__all__ = [ - "IliasWebCrawler", - "IliasWebCrawlerSection", - "KitIliasWebCrawler", - "KitIliasWebCrawlerSection", -] +__all__ = ["KitIliasWebCrawler", "KitIliasWebCrawlerSection"] diff --git a/PFERD/crawl/ilias/async_helper.py b/PFERD/crawl/ilias/async_helper.py deleted file mode 100644 index 2e6b301..0000000 --- a/PFERD/crawl/ilias/async_helper.py +++ /dev/null @@ -1,41 +0,0 @@ -import asyncio -from collections.abc import Callable -from typing import Any, Optional - -import aiohttp - -from ...logging import log -from ..crawler import AWrapped, CrawlError, CrawlWarning - - -def _iorepeat(attempts: int, name: str, failure_is_error: bool = False) -> Callable[[AWrapped], AWrapped]: - def decorator(f: AWrapped) -> AWrapped: - async def wrapper(*args: Any, **kwargs: Any) -> Optional[Any]: - last_exception: Optional[BaseException] = None - for round in range(attempts): - try: - return await f(*args, **kwargs) - except aiohttp.ContentTypeError: # invalid content type - raise CrawlWarning("ILIAS returned an invalid content type") from None - except aiohttp.TooManyRedirects: - raise CrawlWarning("Got stuck in a redirect loop") from None - except aiohttp.ClientPayloadError as e: # encoding or not enough bytes - last_exception = e - except aiohttp.ClientConnectionError as e: # e.g. timeout, disconnect, resolve failed, etc. - last_exception = e - except asyncio.exceptions.TimeoutError as e: # explicit http timeouts in HttpCrawler - last_exception = e - log.explain_topic(f"Retrying operation {name}. Retries left: {attempts - 1 - round}") - log.explain(f"Last exception: {last_exception!r}") - - if last_exception: - message = f"Error in I/O Operation: {last_exception!r}" - if failure_is_error: - raise CrawlError(message) from last_exception - else: - raise CrawlWarning(message) from last_exception - raise CrawlError("Impossible return in ilias _iorepeat") - - return wrapper # type: ignore - - return decorator diff --git a/PFERD/crawl/ilias/file_templates.py b/PFERD/crawl/ilias/file_templates.py index c832977..b206461 100644 --- a/PFERD/crawl/ilias/file_templates.py +++ b/PFERD/crawl/ilias/file_templates.py @@ -1,7 +1,5 @@ -import dataclasses -import re from enum import Enum -from typing import Optional, cast +from typing import Optional import bs4 @@ -14,9 +12,7 @@ _link_template_fancy = """ ILIAS - Link: {{name}} - - -
- -
- -
-
- {{name}} -
-
{{description}}
-
- +
+ - +
+
+ {{name}} +
+
{{description}}
+
+
@@ -111,7 +96,6 @@ _link_template_fancy = """ _link_template_internet_shortcut = """ [InternetShortcut] URL={{link}} -Desc={{description}} """.strip() _learning_module_template = """ @@ -142,88 +126,6 @@ _learning_module_template = """ """ -_forum_thread_template = """ - - - - - ILIAS - Forum: {{name}} - - - - {{heading}} - {{content}} - - -""".strip() # noqa: E501 line too long - def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next: Optional[str]) -> str: # Seems to be comments, ignore those. @@ -237,13 +139,13 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next
""" if prev and body.select_one(".ilc_page_lnav_LeftNavigation"): - text = cast(bs4.Tag, body.select_one(".ilc_page_lnav_LeftNavigation")).get_text().strip() + text = body.select_one(".ilc_page_lnav_LeftNavigation").getText().strip() left = f'{text}' else: left = "" if next and body.select_one(".ilc_page_rnav_RightNavigation"): - text = cast(bs4.Tag, body.select_one(".ilc_page_rnav_RightNavigation")).get_text().strip() + text = body.select_one(".ilc_page_rnav_RightNavigation").getText().strip() right = f'{text}' else: right = "" @@ -254,29 +156,12 @@ def learning_module_template(body: bs4.Tag, name: str, prev: Optional[str], next ) if bot_nav := body.select_one(".ilc_page_bnav_BottomNavigation"): - bot_nav.replace_with( - soupify(nav_template.replace("{{left}}", left).replace("{{right}}", right).encode()) + bot_nav.replace_with(soupify(nav_template.replace( + "{{left}}", left).replace("{{right}}", right).encode()) ) - body_str = body.prettify() - return _learning_module_template.replace("{{body}}", body_str).replace("{{name}}", name) - - -def forum_thread_template(name: str, url: str, heading: bs4.Tag, content: bs4.Tag) -> str: - if title := heading.find(name="b"): - title.wrap(bs4.Tag(name="a", attrs={"href": url})) - return ( - _forum_thread_template.replace("{{name}}", name) - .replace("{{heading}}", heading.prettify()) - .replace("{{content}}", content.prettify()) - ) - - -@dataclasses.dataclass -class LinkData: - name: str - url: str - description: str + body = body.prettify() + return _learning_module_template.replace("{{body}}", body).replace("{{name}}", name) class Links(Enum): @@ -296,9 +181,6 @@ class Links(Enum): return None raise ValueError("Missing switch case") - def collection_as_one(self) -> bool: - return self == Links.FANCY - def extension(self) -> Optional[str]: if self == Links.FANCY: return ".html" @@ -310,47 +192,10 @@ class Links(Enum): return None raise ValueError("Missing switch case") - def interpolate(self, redirect_delay: int, collection_name: str, links: list[LinkData]) -> str: - template = self.template() - if template is None: - raise ValueError("Cannot interpolate ignored links") - - if len(links) == 1: - link = links[0] - content = template - content = content.replace("{{link}}", link.url) - content = content.replace("{{name}}", link.name) - content = content.replace("{{description}}", link.description) - content = content.replace("{{redirect_delay}}", str(redirect_delay)) - return content - if self == Links.PLAINTEXT or self == Links.INTERNET_SHORTCUT: - return "\n".join(f"{link.url}" for link in links) - - # All others get coerced to fancy - content = cast(str, Links.FANCY.template()) - repeated_content = cast( - re.Match[str], re.search(r"([\s\S]+)", content) - ).group(1) - - parts = [] - for link in links: - instance = repeated_content - instance = instance.replace("{{link}}", link.url) - instance = instance.replace("{{name}}", link.name) - instance = instance.replace("{{description}}", link.description) - instance = instance.replace("{{redirect_delay}}", str(redirect_delay)) - parts.append(instance) - - content = content.replace(repeated_content, "\n".join(parts)) - content = content.replace("{{name}}", collection_name) - content = re.sub(r"[\s\S]+", "", content) - - return content - @staticmethod def from_string(string: str) -> "Links": try: return Links(string) except ValueError: - options = [f"'{option.value}'" for option in Links] - raise ValueError(f"must be one of {', '.join(options)}") from None + raise ValueError("must be one of 'ignore', 'plaintext'," + " 'html', 'internet-shortcut'") diff --git a/PFERD/crawl/ilias/ilias_html_cleaner.py b/PFERD/crawl/ilias/ilias_html_cleaner.py index 35a7ea0..5495304 100644 --- a/PFERD/crawl/ilias/ilias_html_cleaner.py +++ b/PFERD/crawl/ilias/ilias_html_cleaner.py @@ -1,5 +1,3 @@ -from typing import cast - from bs4 import BeautifulSoup, Comment, Tag _STYLE_TAG_CONTENT = """ @@ -14,13 +12,6 @@ _STYLE_TAG_CONTENT = """ font-weight: bold; } - .row-flex { - display: flex; - } - .row-flex-wrap { - flex-wrap: wrap; - } - .accordion-head { background-color: #f5f7fa; padding: 0.5rem 0; @@ -39,10 +30,6 @@ _STYLE_TAG_CONTENT = """ margin: 0.5rem 0; } - img { - background-color: white; - } - body { padding: 1em; grid-template-columns: 1fr min(60rem, 90%) 1fr; @@ -60,11 +47,12 @@ _ARTICLE_WORTHY_CLASSES = [ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: head = soup.new_tag("head") soup.insert(0, head) - # Force UTF-8 encoding - head.append(soup.new_tag("meta", charset="utf-8")) + simplecss_link: Tag = soup.new_tag("link") # - head.append(soup.new_tag("link", rel="stylesheet", href="https://cdn.simplecss.org/simple.css")) + simplecss_link["rel"] = "stylesheet" + simplecss_link["href"] = "https://cdn.simplecss.org/simple.css" + head.append(simplecss_link) # Basic style tags for compat style: Tag = soup.new_tag("style") @@ -75,18 +63,18 @@ def insert_base_markup(soup: BeautifulSoup) -> BeautifulSoup: def clean(soup: BeautifulSoup) -> BeautifulSoup: - for block in cast(list[Tag], soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES)): + for block in soup.find_all(class_=lambda x: x in _ARTICLE_WORTHY_CLASSES): block.name = "article" - for block in cast(list[Tag], soup.find_all("h3")): + for block in soup.find_all("h3"): block.name = "div" - for block in cast(list[Tag], soup.find_all("h1")): + for block in soup.find_all("h1"): block.name = "h3" - for block in cast(list[Tag], soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap")): + for block in soup.find_all(class_="ilc_va_ihcap_VAccordIHeadCap"): block.name = "h3" - block["class"] += ["accordion-head"] # type: ignore + block["class"] += ["accordion-head"] for dummy in soup.select(".ilc_text_block_Standard.ilc_Paragraph"): children = list(dummy.children) @@ -97,12 +85,7 @@ def clean(soup: BeautifulSoup) -> BeautifulSoup: if isinstance(type(children[0]), Comment): dummy.decompose() - # Delete video figures, as they can not be internalized anyway - for video in soup.select(".ilc_media_cont_MediaContainerHighlighted .ilPageVideo"): - if figure := video.find_parent("figure"): - figure.decompose() - - for hrule_imposter in cast(list[Tag], soup.find_all(class_="ilc_section_Separator")): + for hrule_imposter in soup.find_all(class_="ilc_section_Separator"): hrule_imposter.insert(0, soup.new_tag("hr")) return soup diff --git a/PFERD/crawl/ilias/ilias_web_crawler.py b/PFERD/crawl/ilias/ilias_web_crawler.py deleted file mode 100644 index b5041b3..0000000 --- a/PFERD/crawl/ilias/ilias_web_crawler.py +++ /dev/null @@ -1,1074 +0,0 @@ -import asyncio -import base64 -import os -import re -from collections.abc import Awaitable, Coroutine -from pathlib import PurePath -from typing import Any, Literal, Optional, cast -from urllib.parse import urljoin - -import aiohttp -from aiohttp import hdrs -from bs4 import BeautifulSoup, Tag - -from ...auth import Authenticator -from ...config import Config -from ...logging import ProgressBar, log -from ...output_dir import FileSink, Redownload -from ...utils import fmt_path, sanitize_path_name, soupify, url_set_query_param -from ..crawler import CrawlError, CrawlToken, CrawlWarning, DownloadToken, anoncritical -from ..http_crawler import HttpCrawler, HttpCrawlerSection -from .async_helper import _iorepeat -from .file_templates import LinkData, Links, forum_thread_template, learning_module_template -from .ilias_html_cleaner import clean, insert_base_markup -from .kit_ilias_html import ( - IliasElementType, - IliasForumThread, - IliasLearningModulePage, - IliasPage, - IliasPageElement, - IliasSoup, - parse_ilias_forum_export, -) -from .shibboleth_login import ShibbolethLogin - -TargetType = str | int - - -class LoginTypeLocal: - def __init__(self, client_id: str): - self.client_id = client_id - - -class IliasWebCrawlerSection(HttpCrawlerSection): - def base_url(self) -> str: - base_url = self.s.get("base_url") - if not base_url: - self.missing_value("base_url") - - return base_url - - def login(self) -> Literal["shibboleth"] | LoginTypeLocal: - login_type = self.s.get("login_type") - if not login_type: - self.missing_value("login_type") - if login_type == "shibboleth": - return "shibboleth" - if login_type == "local": - client_id = self.s.get("client_id") - if not client_id: - self.missing_value("client_id") - return LoginTypeLocal(client_id) - - self.invalid_value("login_type", login_type, "Should be ") - - def tfa_auth(self, authenticators: dict[str, Authenticator]) -> Optional[Authenticator]: - value: Optional[str] = self.s.get("tfa_auth") - if value is None: - return None - auth = authenticators.get(value) - if auth is None: - self.invalid_value("tfa_auth", value, "No such auth section exists") - return auth - - def target(self) -> TargetType: - target = self.s.get("target") - if not target: - self.missing_value("target") - - if re.fullmatch(r"\d+", target): - # Course id - return int(target) - if target == "desktop": - # Full personal desktop - return target - if target.startswith(self.base_url()): - # URL - return target - - self.invalid_value("target", target, "Should be ") - - def links(self) -> Links: - type_str: Optional[str] = self.s.get("links") - - if type_str is None: - return Links.FANCY - - try: - return Links.from_string(type_str) - except ValueError as e: - self.invalid_value("links", type_str, str(e).capitalize()) - - def link_redirect_delay(self) -> int: - return self.s.getint("link_redirect_delay", fallback=-1) - - def videos(self) -> bool: - return self.s.getboolean("videos", fallback=False) - - def forums(self) -> bool: - return self.s.getboolean("forums", fallback=False) - - -_DIRECTORY_PAGES: set[IliasElementType] = { - IliasElementType.EXERCISE, - IliasElementType.EXERCISE_FILES, - IliasElementType.EXERCISE_OVERVIEW, - IliasElementType.FOLDER, - IliasElementType.INFO_TAB, - IliasElementType.MEDIACAST_VIDEO_FOLDER, - IliasElementType.MEETING, - IliasElementType.OPENCAST_VIDEO_FOLDER, - IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, -} - -_VIDEO_ELEMENTS: set[IliasElementType] = { - IliasElementType.MEDIACAST_VIDEO, - IliasElementType.MEDIACAST_VIDEO_FOLDER, - IliasElementType.OPENCAST_VIDEO, - IliasElementType.OPENCAST_VIDEO_FOLDER, - IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, - IliasElementType.OPENCAST_VIDEO_PLAYER, -} - - -def _get_video_cache_key(element: IliasPageElement) -> str: - return f"ilias-video-cache-{element.id()}" - - -# Crawler control flow: -# -# crawl_desktop -+ -# | -# crawl_course --+ -# | -# @_io_repeat | # retries internally (before the bar) -# +- crawl_url <-+ -# | -# | -# | @_wrap_io_exception # does not need to retry as children acquire bars -# +> crawl_ilias_element -+ -# ^ | -# | @_io_repeat | # retries internally (before the bar) -# +- crawl_ilias_page <---+ -# | | -# +> get_page | # Handles and retries authentication -# | -# @_io_repeat | # retries internally (before the bar) -# +- download_link <---+ -# | | -# +> resolve_target | # Handles and retries authentication -# | -# @_io_repeat | # retries internally (before the bar) -# +- download_video <---+ -# | | -# | @_io_repeat | # retries internally (before the bar) -# +- download_file <---+ -# | -# +> stream_from_url # Handles and retries authentication -class IliasWebCrawler(HttpCrawler): - def __init__( - self, - name: str, - section: IliasWebCrawlerSection, - config: Config, - authenticators: dict[str, Authenticator], - ): - # Setting a main authenticator for cookie sharing - auth = section.auth(authenticators) - super().__init__(name, section, config, shared_auth=auth) - - if section.tasks() > 1: - log.warn( - """ -Please avoid using too many parallel requests as these are the KIT ILIAS -instance's greatest bottleneck. - """.strip() - ) - - self._auth = auth - self._base_url = section.base_url() - self._tfa_auth = section.tfa_auth(authenticators) - - self._login_type = section.login() - if isinstance(self._login_type, LoginTypeLocal): - self._client_id = self._login_type.client_id - else: - self._shibboleth_login = ShibbolethLogin(self._base_url, self._auth, self._tfa_auth) - - self._target = section.target() - self._link_file_redirect_delay = section.link_redirect_delay() - self._links = section.links() - self._videos = section.videos() - self._forums = section.forums() - self._visited_urls: dict[str, PurePath] = dict() - - async def _run(self) -> None: - if isinstance(self._target, int): - log.explain_topic(f"Inferred crawl target: Course with id {self._target}") - await self._crawl_course(self._target) - elif self._target == "desktop": - log.explain_topic("Inferred crawl target: Personal desktop") - await self._crawl_desktop() - else: - log.explain_topic(f"Inferred crawl target: URL {self._target}") - await self._crawl_url(self._target) - - async def _crawl_course(self, course_id: int) -> None: - # Start crawling at the given course - root_url = url_set_query_param( - urljoin(self._base_url + "/", "goto.php"), - "target", - f"crs_{course_id}", - ) - - await self._crawl_url(root_url, expected_id=course_id) - - async def _crawl_desktop(self) -> None: - await self._crawl_url( - urljoin(self._base_url, "/ilias.php?baseClass=ilDashboardGUI&cmd=show"), crawl_nested_courses=True - ) - - async def _crawl_url( - self, url: str, expected_id: Optional[int] = None, crawl_nested_courses: bool = False - ) -> None: - if awaitable := await self._handle_ilias_page( - url, None, PurePath("."), expected_id, crawl_nested_courses - ): - await awaitable - - async def _handle_ilias_page( - self, - url: str, - current_element: Optional[IliasPageElement], - path: PurePath, - expected_course_id: Optional[int] = None, - crawl_nested_courses: bool = False, - ) -> Optional[Coroutine[Any, Any, None]]: - maybe_cl = await self.crawl(path) - if not maybe_cl: - return None - if current_element: - self._ensure_not_seen(current_element, path) - - return self._crawl_ilias_page( - url, current_element, maybe_cl, expected_course_id, crawl_nested_courses - ) - - @anoncritical - async def _crawl_ilias_page( - self, - url: str, - current_element: Optional[IliasPageElement], - cl: CrawlToken, - expected_course_id: Optional[int] = None, - crawl_nested_courses: bool = False, - ) -> None: - elements: list[IliasPageElement] = [] - # A list as variable redefinitions are not propagated to outer scopes - description: list[BeautifulSoup] = [] - - @_iorepeat(3, "crawling folder") - async def gather_elements() -> None: - elements.clear() - async with cl: - next_stage_url: Optional[str] = url - current_parent = current_element - page = None - - while next_stage_url: - soup = await self._get_page(next_stage_url) - log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") - log.explain(f"URL: {next_stage_url}") - - # If we expect to find a root course, enforce it - if current_parent is None and expected_course_id is not None: - perma_link = IliasPage.get_soup_permalink(soup) - if not perma_link or "crs/" not in perma_link: - raise CrawlError("Invalid course id? Didn't find anything looking like a course") - if str(expected_course_id) not in perma_link: - raise CrawlError(f"Expected course id {expected_course_id} but got {perma_link}") - - page = IliasPage(soup, current_parent) - if next_element := page.get_next_stage_element(): - current_parent = next_element - next_stage_url = next_element.url - else: - next_stage_url = None - - page = cast(IliasPage, page) - elements.extend(page.get_child_elements()) - if current_element is None and (info_tab := page.get_info_tab()): - elements.append(info_tab) - if description_string := page.get_description(): - description.append(description_string) - - # Fill up our task list with the found elements - await gather_elements() - - if description: - await self._download_description(cl.path, description[0]) - - elements.sort(key=lambda e: e.id()) - - tasks: list[Awaitable[None]] = [] - for element in elements: - if handle := await self._handle_ilias_element(cl.path, element, crawl_nested_courses): - tasks.append(asyncio.create_task(handle)) - - # And execute them - await self.gather(tasks) - - # These decorators only apply *to this method* and *NOT* to the returned - # awaitables! - # This method does not await the handlers but returns them instead. - # This ensures one level is handled at a time and name deduplication - # works correctly. - @anoncritical - async def _handle_ilias_element( - self, parent_path: PurePath, element: IliasPageElement, crawl_nested_courses: bool = False - ) -> Optional[Coroutine[Any, Any, None]]: - # element.name might contain `/` if the crawler created nested elements, - # so we can not sanitize it here. We trust in the output dir to thwart worst-case - # directory escape attacks. - element_path = PurePath(parent_path, element.name) - - # This is symptomatic of no access to the element, for example, because - # of time availability restrictions. - if "cmdClass=ilInfoScreenGUI" in element.url and "cmd=showSummary" in element.url: - log.explain( - "Skipping element as url points to info screen, " - "this should only happen with not-yet-released elements" - ) - return None - - if element.type in _VIDEO_ELEMENTS and not self._videos: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](enable with option 'videos')", - ) - return None - - if element.type == IliasElementType.FILE: - return await self._handle_file(element, element_path) - elif element.type == IliasElementType.FORUM: - if not self._forums: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](enable with option 'forums')", - ) - return None - return await self._handle_forum(element, element_path) - elif element.type == IliasElementType.TEST: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](tests contain no relevant data)", - ) - return None - elif element.type == IliasElementType.SURVEY: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](surveys contain no relevant data)", - ) - return None - elif element.type == IliasElementType.SCORM_LEARNING_MODULE: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](scorm learning modules are not supported)", - ) - return None - elif element.type == IliasElementType.LITERATURE_LIST: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](literature lists are not currently supported)", - ) - return None - elif element.type == IliasElementType.LEARNING_MODULE_HTML: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](HTML learning modules are not supported)", - ) - return None - elif element.type == IliasElementType.BLOG: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](blogs are not currently supported)", - ) - return None - elif element.type == IliasElementType.DCL_RECORD_LIST: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](dcl record lists are not currently supported)", - ) - return None - elif element.type == IliasElementType.MEDIA_POOL: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](media pools are not currently supported)", - ) - return None - elif element.type == IliasElementType.COURSE: - if crawl_nested_courses: - return await self._handle_ilias_page(element.url, element, element_path) - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](not descending into linked course)", - ) - return None - elif element.type == IliasElementType.WIKI: - log.status( - "[bold bright_black]", - "Ignored", - fmt_path(element_path), - "[bright_black](wikis are not currently supported)", - ) - return None - elif element.type == IliasElementType.LEARNING_MODULE: - return await self._handle_learning_module(element, element_path) - elif element.type == IliasElementType.LINK: - return await self._handle_link(element, element_path) - elif element.type == IliasElementType.LINK_COLLECTION: - return await self._handle_link(element, element_path) - elif element.type == IliasElementType.BOOKING: - return await self._handle_booking(element, element_path) - elif element.type == IliasElementType.OPENCAST_VIDEO: - return await self._handle_file(element, element_path) - elif element.type == IliasElementType.OPENCAST_VIDEO_PLAYER: - return await self._handle_opencast_video(element, element_path) - elif element.type == IliasElementType.MEDIACAST_VIDEO: - return await self._handle_file(element, element_path) - elif element.type == IliasElementType.MOB_VIDEO: - return await self._handle_file(element, element_path, is_video=True) - elif element.type in _DIRECTORY_PAGES: - return await self._handle_ilias_page(element.url, element, element_path) - else: - # This will retry it a few times, failing everytime. It doesn't make any network - # requests, so that's fine. - raise CrawlWarning(f"Unknown element type: {element.type!r}") - - async def _handle_link( - self, - element: IliasPageElement, - element_path: PurePath, - ) -> Optional[Coroutine[Any, Any, None]]: - log.explain_topic(f"Decision: Crawl Link {fmt_path(element_path)}") - log.explain(f"Links type is {self._links}") - - export_url = url_set_query_param(element.url, "cmd", "exportHTML") - resolved = await self._resolve_link_target(export_url) - if resolved == "none": - links = [LinkData(element.name, "", element.description or "")] - else: - links = self._parse_link_content(element, cast(BeautifulSoup, resolved)) - - maybe_extension = self._links.extension() - - if not maybe_extension: - log.explain("Answer: No") - return None - else: - log.explain("Answer: Yes") - - if len(links) <= 1 or self._links.collection_as_one(): - element_path = element_path.with_name(element_path.name + maybe_extension) - maybe_dl = await self.download(element_path, mtime=element.mtime) - if not maybe_dl: - return None - return self._download_link(self._links, element.name, links, maybe_dl) - - maybe_cl = await self.crawl(element_path) - if not maybe_cl: - return None - # Required for download_all closure - cl = maybe_cl - extension = maybe_extension - - async def download_all() -> None: - for link in links: - path = cl.path / (sanitize_path_name(link.name) + extension) - if dl := await self.download(path, mtime=element.mtime): - await self._download_link(self._links, element.name, [link], dl) - - return download_all() - - @anoncritical - @_iorepeat(3, "resolving link") - async def _download_link( - self, link_renderer: Links, collection_name: str, links: list[LinkData], dl: DownloadToken - ) -> None: - async with dl as (bar, sink): - rendered = link_renderer.interpolate(self._link_file_redirect_delay, collection_name, links) - sink.file.write(rendered.encode("utf-8")) - sink.done() - - async def _resolve_link_target(self, export_url: str) -> BeautifulSoup | Literal["none"]: - async def impl() -> Optional[BeautifulSoup | Literal["none"]]: - async with self.session.get(export_url, allow_redirects=False) as resp: - # No redirect means we were authenticated - if hdrs.LOCATION not in resp.headers: - return soupify(await resp.read()) # .select_one("a").get("href").strip() # type: ignore - # We are either unauthenticated or the link is not active - new_url = resp.headers[hdrs.LOCATION].lower() - if "baseclass=illinkresourcehandlergui" in new_url and "cmd=infoscreen" in new_url: - return "none" - return None - - auth_id = await self._current_auth_id() - target = await impl() - if target is not None: - return target - - await self.authenticate(auth_id) - - target = await impl() - if target is not None: - return target - - raise CrawlError("resolve_link_target failed even after authenticating") - - @staticmethod - def _parse_link_content(element: IliasPageElement, content: BeautifulSoup) -> list[LinkData]: - links = list(content.select("a")) - if len(links) == 1: - url = str(links[0].get("href")).strip() - return [LinkData(name=element.name, description=element.description or "", url=url)] - - results = [] - for link in links: - url = str(link.get("href")).strip() - name = link.get_text(strip=True) - description = cast(Tag, link.find_next_sibling("dd")).get_text(strip=True) - results.append(LinkData(name=name, description=description, url=url.strip())) - - return results - - async def _handle_booking( - self, - element: IliasPageElement, - element_path: PurePath, - ) -> Optional[Coroutine[Any, Any, None]]: - log.explain_topic(f"Decision: Crawl Booking Link {fmt_path(element_path)}") - log.explain(f"Links type is {self._links}") - - link_template_maybe = self._links.template() - link_extension = self._links.extension() - if not link_template_maybe or not link_extension: - log.explain("Answer: No") - return None - else: - log.explain("Answer: Yes") - element_path = element_path.with_name(element_path.name + link_extension) - - maybe_dl = await self.download(element_path, mtime=element.mtime) - if not maybe_dl: - return None - - self._ensure_not_seen(element, element_path) - - return self._download_booking(element, maybe_dl) - - @anoncritical - @_iorepeat(1, "downloading description") - async def _download_description(self, parent_path: PurePath, description: BeautifulSoup) -> None: - path = parent_path / "Description.html" - dl = await self.download(path, redownload=Redownload.ALWAYS) - if not dl: - return - - async with dl as (_bar, sink): - description = clean(insert_base_markup(description)) - description_tag = await self.internalize_images(description) - sink.file.write(description_tag.prettify().encode("utf-8")) - sink.done() - - @anoncritical - @_iorepeat(3, "resolving booking") - async def _download_booking( - self, - element: IliasPageElement, - dl: DownloadToken, - ) -> None: - async with dl as (bar, sink): - links = [LinkData(name=element.name, description=element.description or "", url=element.url)] - rendered = self._links.interpolate(self._link_file_redirect_delay, element.name, links) - sink.file.write(rendered.encode("utf-8")) - sink.done() - - async def _handle_opencast_video( - self, - element: IliasPageElement, - element_path: PurePath, - ) -> Optional[Coroutine[Any, Any, None]]: - # Copy old mapping as it is likely still relevant - if self.prev_report: - self.report.add_custom_value( - _get_video_cache_key(element), - self.prev_report.get_custom_value(_get_video_cache_key(element)), - ) - - # A video might contain other videos, so let's "crawl" the video first - # to ensure rate limits apply. This must be a download as *this token* - # is re-used if the video consists of a single stream. In that case the - # file name is used and *not* the stream name the ilias html parser reported - # to ensure backwards compatibility. - maybe_dl = await self.download(element_path, mtime=element.mtime, redownload=Redownload.ALWAYS) - - # If we do not want to crawl it (user filter), we can move on - if not maybe_dl: - return None - - self._ensure_not_seen(element, element_path) - - # If we have every file from the cached mapping already, we can ignore this and bail - if self._all_opencast_videos_locally_present(element, maybe_dl.path): - # Mark all existing videos as known to ensure they do not get deleted during cleanup. - # We "downloaded" them, just without actually making a network request as we assumed - # they did not change. - contained = self._previous_contained_opencast_videos(element, maybe_dl.path) - if len(contained) > 1: - # Only do this if we threw away the original dl token, - # to not download single-stream videos twice - for video in contained: - await self.download(video) - - return None - - return self._download_opencast_video(element, maybe_dl) - - def _previous_contained_opencast_videos( - self, element: IliasPageElement, element_path: PurePath - ) -> list[PurePath]: - if not self.prev_report: - return [] - custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) - if not custom_value: - return [] - cached_value = cast(dict[str, Any], custom_value) - if "known_paths" not in cached_value or "own_path" not in cached_value: - log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}") - return [] - transformed_own_path = self._transformer.transform(element_path) - if cached_value["own_path"] != str(transformed_own_path): - log.explain( - f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}" - ) - return [] - return [PurePath(name) for name in cached_value["known_paths"]] - - def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool: - log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}") - if contained_videos := self._previous_contained_opencast_videos(element, element_path): - log.explain( - f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}" - ) - if all(self._output_dir.resolve(path).exists() for path in contained_videos): - log.explain("Found all known videos locally, skipping enumeration request") - return True - log.explain("Missing at least one video, continuing with requests!") - else: - log.explain("No local cache present") - return False - - @anoncritical - @_iorepeat(3, "downloading video") - async def _download_opencast_video(self, element: IliasPageElement, dl: DownloadToken) -> None: - def add_to_report(paths: list[str]) -> None: - self.report.add_custom_value( - _get_video_cache_key(element), - {"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))}, - ) - - async with dl as (bar, sink): - page = IliasPage(await self._get_page(element.url), element) - stream_elements = page.get_child_elements() - - if len(stream_elements) > 1: - log.explain(f"Found multiple video streams for {element.name}") - else: - log.explain(f"Using single video mode for {element.name}") - stream_element = stream_elements[0] - - # We do not have a local cache yet - await self._stream_from_url(stream_element, sink, bar, is_video=True) - add_to_report([str(self._transformer.transform(dl.path))]) - return - - contained_video_paths: list[str] = [] - - for stream_element in stream_elements: - video_path = dl.path.parent / stream_element.name - - maybe_dl = await self.download(video_path, mtime=element.mtime, redownload=Redownload.NEVER) - if not maybe_dl: - continue - async with maybe_dl as (bar, sink): - log.explain(f"Streaming video from real url {stream_element.url}") - contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) - await self._stream_from_url(stream_element, sink, bar, is_video=True) - - add_to_report(contained_video_paths) - - async def _handle_file( - self, - element: IliasPageElement, - element_path: PurePath, - is_video: bool = False, - ) -> Optional[Coroutine[Any, Any, None]]: - maybe_dl = await self.download(element_path, mtime=element.mtime) - if not maybe_dl: - return None - self._ensure_not_seen(element, element_path) - - return self._download_file(element, maybe_dl, is_video) - - @_iorepeat(3, "downloading file") - @anoncritical - async def _download_file(self, element: IliasPageElement, dl: DownloadToken, is_video: bool) -> None: - assert dl # The function is only reached when dl is not None - async with dl as (bar, sink): - await self._stream_from_url(element, sink, bar, is_video) - - async def _stream_from_url( - self, element: IliasPageElement, sink: FileSink, bar: ProgressBar, is_video: bool - ) -> None: - url = element.url - - async def try_stream() -> bool: - next_url = url - # Normal files redirect to the magazine if we are not authenticated. As files could be HTML, - # we can not match on the content type here. Instead, we disallow redirects and inspect the - # new location. If we are redirected anywhere but the ILIAS 8 "sendfile" command, we assume - # our authentication expired. - if not is_video: - async with self.session.get(url, allow_redirects=False) as resp: - # Redirect to anything except a "sendfile" means we weren't authenticated - if hdrs.LOCATION in resp.headers: - if "&cmd=sendfile" not in resp.headers[hdrs.LOCATION]: - return False - # Directly follow the redirect to not make a second, unnecessary request - next_url = resp.headers[hdrs.LOCATION] - - # Let's try this again and follow redirects - return await fetch_follow_redirects(next_url) - - async def fetch_follow_redirects(file_url: str) -> bool: - async with self.session.get(file_url) as resp: - # We wanted a video but got HTML => Forbidden, auth expired. Logging in won't really - # solve that depending on the setup, but it is better than nothing. - if is_video and "html" in resp.content_type: - return False - - # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range - if content_range := resp.headers.get(hdrs.CONTENT_RANGE, default=None): - parts = content_range.split("/") - if len(parts) == 2 and parts[1].isdigit(): - bar.set_total(int(parts[1])) - - # Prefer the content length header - if resp.content_length: - bar.set_total(resp.content_length) - - async for data in resp.content.iter_chunked(1024): - sink.file.write(data) - bar.advance(len(data)) - - sink.done() - return True - - auth_id = await self._current_auth_id() - if await try_stream(): - return - - await self.authenticate(auth_id) - - if not await try_stream(): - raise CrawlError(f"File streaming failed after authenticate() {element!r}") - - async def _handle_forum( - self, - element: IliasPageElement, - element_path: PurePath, - ) -> Optional[Coroutine[Any, Any, None]]: - maybe_cl = await self.crawl(element_path) - if not maybe_cl: - return None - return self._crawl_forum(element, maybe_cl) - - @_iorepeat(3, "crawling forum") - @anoncritical - async def _crawl_forum(self, element: IliasPageElement, cl: CrawlToken) -> None: - async with cl: - inner = IliasPage(await self._get_page(element.url), element) - export_url = inner.get_forum_export_url() - if not export_url: - log.warn("Could not extract forum export url") - return - - export = await self._post( - export_url, - {"format": "html", "cmd[createExportFile]": ""}, - ) - - elements = parse_ilias_forum_export(soupify(export)) - - tasks: list[Awaitable[None]] = [] - for thread in elements: - tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, thread, element.url))) - - # And execute them - await self.gather(tasks) - - @anoncritical - @_iorepeat(3, "saving forum thread") - async def _download_forum_thread( - self, parent_path: PurePath, thread: IliasForumThread | IliasPageElement, forum_url: str - ) -> None: - path = parent_path / (sanitize_path_name(thread.name) + ".html") - maybe_dl = await self.download(path, mtime=thread.mtime) - if not maybe_dl or not isinstance(thread, IliasForumThread): - return - - async with maybe_dl as (bar, sink): - rendered = forum_thread_template( - thread.name, forum_url, thread.name_tag, await self.internalize_images(thread.content_tag) - ) - sink.file.write(rendered.encode("utf-8")) - sink.done() - - async def _handle_learning_module( - self, - element: IliasPageElement, - element_path: PurePath, - ) -> Optional[Coroutine[Any, Any, None]]: - maybe_cl = await self.crawl(element_path) - if not maybe_cl: - return None - self._ensure_not_seen(element, element_path) - - return self._crawl_learning_module(element, maybe_cl) - - @_iorepeat(3, "crawling learning module") - @anoncritical - async def _crawl_learning_module(self, element: IliasPageElement, cl: CrawlToken) -> None: - elements: list[IliasLearningModulePage] = [] - - async with cl: - log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") - log.explain(f"URL: {element.url}") - soup = await self._get_page(element.url) - page = IliasPage(soup, element) - if next := page.get_learning_module_data(): - elements.extend( - await self._crawl_learning_module_direction(cl.path, next.previous_url, "left", element) - ) - elements.append(next) - elements.extend( - await self._crawl_learning_module_direction(cl.path, next.next_url, "right", element) - ) - - # Reflect their natural ordering in the file names - for index, lm_element in enumerate(elements): - lm_element.title = f"{index:02}_{lm_element.title}" - - tasks: list[Awaitable[None]] = [] - for index, elem in enumerate(elements): - prev_url = elements[index - 1].title if index > 0 else None - next_url = elements[index + 1].title if index < len(elements) - 1 else None - tasks.append( - asyncio.create_task(self._download_learning_module_page(cl.path, elem, prev_url, next_url)) - ) - - # And execute them - await self.gather(tasks) - - async def _crawl_learning_module_direction( - self, - path: PurePath, - start_url: Optional[str], - dir: Literal["left"] | Literal["right"], - parent_element: IliasPageElement, - ) -> list[IliasLearningModulePage]: - elements: list[IliasLearningModulePage] = [] - - if not start_url: - return elements - - next_element_url: Optional[str] = start_url - counter = 0 - while next_element_url: - log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") - log.explain(f"URL: {next_element_url}") - soup = await self._get_page(next_element_url) - page = IliasPage(soup, parent_element) - if next := page.get_learning_module_data(): - elements.append(next) - next_element_url = next.previous_url if dir == "left" else next.next_url - counter += 1 - - return elements - - @anoncritical - @_iorepeat(3, "saving learning module page") - async def _download_learning_module_page( - self, - parent_path: PurePath, - element: IliasLearningModulePage, - prev: Optional[str], - next: Optional[str], - ) -> None: - path = parent_path / (sanitize_path_name(element.title) + ".html") - maybe_dl = await self.download(path) - if not maybe_dl: - return - my_path = self._transformer.transform(maybe_dl.path) - if not my_path: - return - - if prev: - prev_p = self._transformer.transform(parent_path / (sanitize_path_name(prev) + ".html")) - prev = os.path.relpath(prev_p, my_path.parent) if prev_p else None - if next: - next_p = self._transformer.transform(parent_path / (sanitize_path_name(next) + ".html")) - next = os.path.relpath(next_p, my_path.parent) if next_p else None - - async with maybe_dl as (bar, sink): - content = element.content - content = await self.internalize_images(content) - sink.file.write(learning_module_template(content, maybe_dl.path.name, prev, next).encode("utf-8")) - sink.done() - - async def internalize_images(self, tag: Tag) -> Tag: - """ - Tries to fetch ILIAS images and embed them as base64 data. - """ - log.explain_topic("Internalizing images") - for elem in tag.find_all(recursive=True): - if elem.name == "img" and (src := elem.attrs.get("src", None)): - url = urljoin(self._base_url, cast(str, src)) - if not url.startswith(self._base_url): - continue - log.explain(f"Internalizing {url!r}") - img = await self._get_authenticated(url) - elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() - if elem.name == "iframe" and cast(str, elem.attrs.get("src", "")).startswith("//"): - # For unknown reasons the protocol seems to be stripped. - elem.attrs["src"] = "https:" + cast(str, elem.attrs["src"]) - return tag - - def _ensure_not_seen(self, element: IliasPageElement, parent_path: PurePath) -> None: - if element.url in self._visited_urls: - raise CrawlWarning( - f"Found second path to element {element.name!r} at {element.url!r}. " - + f"First path: {fmt_path(self._visited_urls[element.url])}. " - + f"Second path: {fmt_path(parent_path)}." - ) - self._visited_urls[element.url] = parent_path - - async def _get_page(self, url: str, root_page_allowed: bool = False) -> IliasSoup: - auth_id = await self._current_auth_id() - async with self.session.get(url) as request: - soup = IliasSoup(soupify(await request.read()), str(request.url)) - if IliasPage.is_logged_in(soup): - return self._verify_page(soup, url, root_page_allowed) - - # We weren't authenticated, so try to do that - await self.authenticate(auth_id) - - # Retry once after authenticating. If this fails, we will die. - async with self.session.get(url) as request: - soup = IliasSoup(soupify(await request.read()), str(request.url)) - if IliasPage.is_logged_in(soup): - return self._verify_page(soup, url, root_page_allowed) - raise CrawlError(f"get_page failed even after authenticating on {url!r}") - - @staticmethod - def _verify_page(soup: IliasSoup, url: str, root_page_allowed: bool) -> IliasSoup: - if IliasPage.is_root_page(soup) and not root_page_allowed: - raise CrawlError( - "Unexpectedly encountered ILIAS root page. " - "This usually happens because the ILIAS instance is broken. " - "If so, wait a day or two and try again. " - "It could also happen because a crawled element links to the ILIAS root page. " - "If so, use a transform with a ! as target to ignore the particular element. " - f"The redirect came from {url}" - ) - return soup - - async def _post(self, url: str, data: dict[str, str | list[str]]) -> bytes: - form_data = aiohttp.FormData() - for key, val in data.items(): - form_data.add_field(key, val) - - async with self.session.post(url, data=form_data()) as request: - if request.status == 200: - return await request.read() - raise CrawlError(f"post failed with status {request.status}") - - async def _get_authenticated(self, url: str) -> bytes: - auth_id = await self._current_auth_id() - - async with self.session.get(url, allow_redirects=False) as request: - if request.status == 200: - return await request.read() - - # We weren't authenticated, so try to do that - await self.authenticate(auth_id) - - # Retry once after authenticating. If this fails, we will die. - async with self.session.get(url, allow_redirects=False) as request: - if request.status == 200: - return await request.read() - raise CrawlError("get_authenticated failed even after authenticating") - - async def _authenticate(self) -> None: - # fill the session with the correct cookies - if self._login_type == "shibboleth": - await self._shibboleth_login.login(self.session) - else: - params = { - "client_id": self._client_id, - "cmd": "force_login", - } - async with self.session.get(urljoin(self._base_url, "/login.php"), params=params) as request: - login_page = soupify(await request.read()) - - login_form = login_page.find("form", attrs={"name": "login_form"}) - if login_form is None: - raise CrawlError("Could not find the login form! Specified client id might be invalid.") - - login_url = cast(Optional[str], login_form.attrs.get("action")) - if login_url is None: - raise CrawlError("Could not find the action URL in the login form!") - - username, password = await self._auth.credentials() - - login_form_data = aiohttp.FormData() - login_form_data.add_field("login_form/input_3/input_4", username) - login_form_data.add_field("login_form/input_3/input_5", password) - - # do the actual login - async with self.session.post(urljoin(self._base_url, login_url), data=login_form_data) as request: - soup = IliasSoup(soupify(await request.read()), str(request.url)) - if not IliasPage.is_logged_in(soup): - self._auth.invalidate_credentials() diff --git a/PFERD/crawl/ilias/kit_ilias_html.py b/PFERD/crawl/ilias/kit_ilias_html.py index 5966141..0be6448 100644 --- a/PFERD/crawl/ilias/kit_ilias_html.py +++ b/PFERD/crawl/ilias/kit_ilias_html.py @@ -1,245 +1,39 @@ import json import re -from collections.abc import Callable from dataclasses import dataclass from datetime import date, datetime, timedelta from enum import Enum -from typing import Optional, cast +from typing import Dict, List, Optional, Union, cast from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup, Tag -from PFERD.crawl import CrawlError -from PFERD.crawl.crawler import CrawlWarning from PFERD.logging import log -from PFERD.utils import sanitize_path_name, url_set_query_params +from PFERD.utils import url_set_query_params -TargetType = str | int - - -class TypeMatcher: - class UrlPath: - path: str - - def __init__(self, path: str): - self.path = path - - class UrlParameter: - query: str - - def __init__(self, query: str): - self.query = query - - class ImgSrc: - src: str - - def __init__(self, src: str): - self.src = src - - class ImgAlt: - alt: str - - def __init__(self, alt: str): - self.alt = alt - - class All: - matchers: list["IliasElementMatcher"] - - def __init__(self, matchers: list["IliasElementMatcher"]): - self.matchers = matchers - - class Any: - matchers: list["IliasElementMatcher"] - - def __init__(self, matchers: list["IliasElementMatcher"]): - self.matchers = matchers - - @staticmethod - def path(path: str) -> UrlPath: - return TypeMatcher.UrlPath(path) - - @staticmethod - def query(query: str) -> UrlParameter: - return TypeMatcher.UrlParameter(query) - - @staticmethod - def img_src(src: str) -> ImgSrc: - return TypeMatcher.ImgSrc(src) - - @staticmethod - def img_alt(alt: str) -> ImgAlt: - return TypeMatcher.ImgAlt(alt) - - @staticmethod - def all(*matchers: "IliasElementMatcher") -> All: - return TypeMatcher.All(list(matchers)) - - @staticmethod - def any(*matchers: "IliasElementMatcher") -> Any: - return TypeMatcher.Any(list(matchers)) - - @staticmethod - def never() -> Any: - return TypeMatcher.Any([]) - - -IliasElementMatcher = ( - TypeMatcher.UrlPath - | TypeMatcher.UrlParameter - | TypeMatcher.ImgSrc - | TypeMatcher.ImgAlt - | TypeMatcher.All - | TypeMatcher.Any -) +TargetType = Union[str, int] class IliasElementType(Enum): - BLOG = "blog" - BOOKING = "booking" - COURSE = "course" - DCL_RECORD_LIST = "dcl_record_list" - EXERCISE_OVERVIEW = "exercise_overview" - EXERCISE = "exercise" # own submitted files + EXERCISE = "exercise" EXERCISE_FILES = "exercise_files" # own submitted files + TEST = "test" # an online test. Will be ignored currently. FILE = "file" FOLDER = "folder" FORUM = "forum" - FORUM_THREAD = "forum_thread" + LINK = "link" INFO_TAB = "info_tab" LEARNING_MODULE = "learning_module" - LEARNING_MODULE_HTML = "learning_module_html" - LITERATURE_LIST = "literature_list" - LINK = "link" - LINK_COLLECTION = "link_collection" - MEDIA_POOL = "media_pool" - MEDIACAST_VIDEO = "mediacast_video" - MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" + BOOKING = "booking" MEETING = "meeting" - MOB_VIDEO = "mob_video" + SURVEY = "survey" + SCORM_LEARNING_MODULE = "scorm_learning_module" + MEDIACAST_VIDEO_FOLDER = "mediacast_video_folder" + MEDIACAST_VIDEO = "mediacast_video" OPENCAST_VIDEO = "opencast_video" + OPENCAST_VIDEO_PLAYER = "opencast_video_player" OPENCAST_VIDEO_FOLDER = "opencast_video_folder" OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED = "opencast_video_folder_maybe_paginated" - OPENCAST_VIDEO_PLAYER = "opencast_video_player" - SCORM_LEARNING_MODULE = "scorm_learning_module" - SURVEY = "survey" - TEST = "test" # an online test. Will be ignored currently. - WIKI = "wiki" - - def matcher(self) -> IliasElementMatcher: - match self: - case IliasElementType.BLOG: - return TypeMatcher.any(TypeMatcher.img_src("_blog.svg")) - case IliasElementType.BOOKING: - return TypeMatcher.any(TypeMatcher.path("/book/"), TypeMatcher.img_src("_book.svg")) - case IliasElementType.COURSE: - return TypeMatcher.any(TypeMatcher.path("/crs/"), TypeMatcher.img_src("_crsr.svg")) - case IliasElementType.DCL_RECORD_LIST: - return TypeMatcher.any( - TypeMatcher.img_src("_dcl.svg"), TypeMatcher.query("cmdclass=ildclrecordlistgui") - ) - case IliasElementType.EXERCISE: - return TypeMatcher.never() - case IliasElementType.EXERCISE_FILES: - return TypeMatcher.never() - case IliasElementType.EXERCISE_OVERVIEW: - return TypeMatcher.any( - TypeMatcher.path("/exc/"), - TypeMatcher.path("_exc_"), - TypeMatcher.img_src("_exc.svg"), - ) - case IliasElementType.FILE: - return TypeMatcher.any( - TypeMatcher.query("cmd=sendfile"), - TypeMatcher.path("_file_"), - TypeMatcher.img_src("/filedelivery/"), - ) - case IliasElementType.FOLDER: - return TypeMatcher.any( - TypeMatcher.path("/fold/"), - TypeMatcher.img_src("_fold.svg"), - TypeMatcher.path("/grp/"), - TypeMatcher.img_src("_grp.svg"), - TypeMatcher.path("/copa/"), - TypeMatcher.path("_copa_"), - TypeMatcher.img_src("_copa.svg"), - # Not supported right now but warn users - # TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), - # TypeMatcher.img_alt("medienpool"), - # TypeMatcher.img_src("_mep.svg"), - ) - case IliasElementType.FORUM: - return TypeMatcher.any( - TypeMatcher.path("/frm/"), - TypeMatcher.path("_frm_"), - TypeMatcher.img_src("_frm.svg"), - ) - case IliasElementType.FORUM_THREAD: - return TypeMatcher.never() - case IliasElementType.INFO_TAB: - return TypeMatcher.never() - case IliasElementType.LITERATURE_LIST: - return TypeMatcher.img_src("_bibl.svg") - case IliasElementType.LEARNING_MODULE: - return TypeMatcher.any(TypeMatcher.path("/lm/"), TypeMatcher.img_src("_lm.svg")) - case IliasElementType.LEARNING_MODULE_HTML: - return TypeMatcher.any( - TypeMatcher.query("baseclass=ilhtlmpresentationgui"), TypeMatcher.img_src("_htlm.svg") - ) - case IliasElementType.LINK: - return TypeMatcher.any( - TypeMatcher.all( - TypeMatcher.query("baseclass=illinkresourcehandlergui"), - TypeMatcher.query("calldirectlink"), - ), - TypeMatcher.img_src("_webr.svg"), # duplicated :( - ) - case IliasElementType.LINK_COLLECTION: - return TypeMatcher.any( - TypeMatcher.query("baseclass=illinkresourcehandlergui"), - TypeMatcher.img_src("_webr.svg"), # duplicated :( - ) - case IliasElementType.MEDIA_POOL: - return TypeMatcher.any( - TypeMatcher.query("baseclass=ilmediapoolpresentationgui"), TypeMatcher.img_src("_mep.svg") - ) - case IliasElementType.MEDIACAST_VIDEO: - return TypeMatcher.never() - case IliasElementType.MEDIACAST_VIDEO_FOLDER: - return TypeMatcher.any( - TypeMatcher.path("/mcst/"), - TypeMatcher.query("baseclass=ilmediacasthandlergui"), - TypeMatcher.img_src("_mcst.svg"), - ) - case IliasElementType.MEETING: - return TypeMatcher.any(TypeMatcher.img_src("_sess.svg")) - case IliasElementType.MOB_VIDEO: - return TypeMatcher.never() - case IliasElementType.OPENCAST_VIDEO: - return TypeMatcher.never() - case IliasElementType.OPENCAST_VIDEO_FOLDER: - return TypeMatcher.never() - case IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED: - return TypeMatcher.img_alt("opencast") - case IliasElementType.OPENCAST_VIDEO_PLAYER: - return TypeMatcher.never() - case IliasElementType.SCORM_LEARNING_MODULE: - return TypeMatcher.any( - TypeMatcher.query("baseclass=ilsahspresentationgui"), TypeMatcher.img_src("_sahs.svg") - ) - case IliasElementType.SURVEY: - return TypeMatcher.any(TypeMatcher.path("/svy/"), TypeMatcher.img_src("svy.svg")) - case IliasElementType.TEST: - return TypeMatcher.any( - TypeMatcher.query("cmdclass=ilobjtestgui"), - TypeMatcher.query("cmdclass=iltestscreengui"), - TypeMatcher.img_src("_tst.svg"), - ) - case IliasElementType.WIKI: - return TypeMatcher.any( - TypeMatcher.query("baseClass=ilwikihandlergui"), TypeMatcher.img_src("wiki.svg") - ) - - raise CrawlWarning(f"Unknown matcher {self}") @dataclass @@ -253,25 +47,10 @@ class IliasPageElement: def id(self) -> str: regexes = [ r"eid=(?P[0-9a-z\-]+)", - r"book/(?P\d+)", # booking - r"cat/(?P\d+)", - r"copa/(?P\d+)", # content page - r"crs/(?P\d+)", # course - r"exc/(?P\d+)", # exercise - r"file/(?P\d+)", # file - r"fold/(?P\d+)", # folder - r"frm/(?P\d+)", # forum - r"grp/(?P\d+)", # group - r"lm/(?P\d+)", # learning module - r"mcst/(?P\d+)", # mediacast - r"pg/(?P(\d|_)+)", # page? - r"svy/(?P\d+)", # survey - r"sess/(?P\d+)", # session - r"webr/(?P\d+)", # web referene (link) - r"thr_pk=(?P\d+)", # forums + r"file_(?P\d+)", r"ref_id=(?P\d+)", r"target=[a-z]+_(?P\d+)", - r"mm_(?P\d+)", + r"mm_(?P\d+)" ] for regex in regexes: @@ -282,64 +61,18 @@ class IliasPageElement: log.warn(f"Didn't find identity for {self.name} - {self.url}. Please report this.") return self.url - @staticmethod - def create_new( - typ: IliasElementType, - url: str, - name: str, - mtime: Optional[datetime] = None, - description: Optional[str] = None, - skip_sanitize: bool = False, - ) -> "IliasPageElement": - if typ == IliasElementType.MEETING: - normalized = IliasPageElement._normalize_meeting_name(name) - log.explain(f"Normalized meeting name from {name!r} to {normalized!r}") - name = normalized - - if not skip_sanitize: - name = sanitize_path_name(name) - - return IliasPageElement(typ, url, name, mtime, description) - - @staticmethod - def _normalize_meeting_name(meeting_name: str) -> str: - """ - Normalizes meeting names, which have a relative time as their first part, - to their date in ISO format. - """ - - # This checks whether we can reach a `:` without passing a `-` - if re.search(r"^[^-]+: ", meeting_name): # noqa: SIM108 - # Meeting name only contains date: "05. Jan 2000:" - split_delimiter = ":" - else: - # Meeting name contains date and start/end times: "05. Jan 2000, 16:00 - 17:30:" - split_delimiter = ", " - - # We have a meeting day without time - date_portion_str = meeting_name.split(split_delimiter)[0] - date_portion = demangle_date(date_portion_str) - - # We failed to parse the date, bail out - if not date_portion: - return meeting_name - - # Replace the first section with the absolute date - rest_of_name = split_delimiter.join(meeting_name.split(split_delimiter)[1:]) - return datetime.strftime(date_portion, "%Y-%m-%d") + split_delimiter + rest_of_name - @dataclass class IliasDownloadForumData: url: str - form_data: dict[str, str | list[str]] + form_data: Dict[str, Union[str, List[str]]] empty: bool @dataclass class IliasForumThread: - name: str - name_tag: Tag + title: str + title_tag: Tag content_tag: Tag mtime: Optional[datetime] @@ -352,30 +85,21 @@ class IliasLearningModulePage: previous_url: Optional[str] -class IliasSoup: - soup: BeautifulSoup - page_url: str - - def __init__(self, soup: BeautifulSoup, page_url: str): - self.soup = soup - self.page_url = page_url - - class IliasPage: - def __init__(self, ilias_soup: IliasSoup, source_element: Optional[IliasPageElement]): - self._ilias_soup = ilias_soup - self._soup = ilias_soup.soup - self._page_url = ilias_soup.page_url + + def __init__(self, soup: BeautifulSoup, _page_url: str, source_element: Optional[IliasPageElement]): + self._soup = soup + self._page_url = _page_url self._page_type = source_element.type if source_element else None self._source_name = source_element.name if source_element else "" @staticmethod - def is_root_page(soup: IliasSoup) -> bool: + def is_root_page(soup: BeautifulSoup) -> bool: if permalink := IliasPage.get_soup_permalink(soup): - return "goto.php/root/" in permalink + return "goto.php?target=root_" in permalink return False - def get_child_elements(self) -> list[IliasPageElement]: + def get_child_elements(self) -> List[IliasPageElement]: """ Return all child page elements you can find here. """ @@ -402,25 +126,22 @@ class IliasPage: def get_info_tab(self) -> Optional[IliasPageElement]: tab: Optional[Tag] = self._soup.find( - name="a", attrs={"href": lambda x: x is not None and "cmdClass=ilinfoscreengui" in x} + name="a", + attrs={"href": lambda x: x and "cmdClass=ilinfoscreengui" in x} ) if tab is not None: - return IliasPageElement.create_new( - IliasElementType.INFO_TAB, self._abs_url_from_link(tab), "infos" + return IliasPageElement( + IliasElementType.INFO_TAB, + self._abs_url_from_link(tab), + "infos" ) return None def get_description(self) -> Optional[BeautifulSoup]: - def is_interesting_class(name: str | None) -> bool: - return name in [ - "ilCOPageSection", - "ilc_Paragraph", - "ilc_va_ihcap_VAccordIHeadCap", - "ilc_va_ihcap_AccordIHeadCap", - "ilc_media_cont_MediaContainer", - ] + def is_interesting_class(name: str) -> bool: + return name in ["ilCOPageSection", "ilc_Paragraph", "ilc_va_ihcap_VAccordIHeadCap"] - paragraphs: list[Tag] = cast(list[Tag], self._soup.find_all(class_=is_interesting_class)) + paragraphs: List[Tag] = self._soup.findAll(class_=is_interesting_class) if not paragraphs: return None @@ -431,20 +152,6 @@ class IliasPage: for p in paragraphs: if p.find_parent(class_=is_interesting_class): continue - if "ilc_media_cont_MediaContainer" in p["class"] and (video := p.select_one("video")): - # We have an embedded video which should be downloaded by _find_mob_videos - url, title = self._find_mob_video_url_title(video, p) - raw_html += '
External Video: {title}' - else: - raw_html += f"Video elided. Filename: '{title}'." - raw_html += "
\n" - continue # Ignore special listings (like folder groupings) if "ilc_section_Special" in p["class"]: @@ -458,13 +165,13 @@ class IliasPage: def get_learning_module_data(self) -> Optional[IliasLearningModulePage]: if not self._is_learning_module_page(): return None - content = cast(Tag, self._soup.select_one("#ilLMPageContent")) - title = cast(Tag, self._soup.select_one(".ilc_page_title_PageTitle")).get_text().strip() + content = self._soup.select_one("#ilLMPageContent") + title = self._soup.select_one(".ilc_page_title_PageTitle").getText().strip() return IliasLearningModulePage( title=title, content=content, next_url=self._find_learning_module_next(), - previous_url=self._find_learning_module_prev(), + previous_url=self._find_learning_module_prev() ) def _find_learning_module_next(self) -> Optional[str]: @@ -483,28 +190,29 @@ class IliasPage: return url return None - def get_forum_export_url(self) -> Optional[str]: - forum_link = self._soup.select_one("#tab_forums_threads > a") - if not forum_link: - log.explain("Found no forum link") + def get_download_forum_data(self) -> Optional[IliasDownloadForumData]: + form = self._soup.find("form", attrs={"action": lambda x: x and "fallbackCmd=showThreads" in x}) + if not form: return None + post_url = self._abs_url_from_relative(form["action"]) - base_url = self._abs_url_from_link(forum_link) - base_url = re.sub(r"cmd=\w+", "cmd=post", base_url) - base_url = re.sub(r"cmdClass=\w+", "cmdClass=ilExportGUI", base_url) + thread_ids = [f["value"] for f in form.find_all(attrs={"name": "thread_ids[]"})] - rtoken_form = self._soup.find("form", attrs={"action": lambda x: x is not None and "rtoken=" in x}) - if not rtoken_form: - log.explain("Found no rtoken anywhere") - return None - match = cast(re.Match[str], re.search(r"rtoken=(\w+)", str(rtoken_form.attrs["action"]))) - rtoken = match.group(1) + form_data: Dict[str, Union[str, List[str]]] = { + "thread_ids[]": thread_ids, + "selected_cmd2": "html", + "select_cmd2": "Ausführen", + "selected_cmd": "", + } - base_url = base_url + "&rtoken=" + rtoken - - return base_url + return IliasDownloadForumData(url=post_url, form_data=form_data, empty=len(thread_ids) == 0) def get_next_stage_element(self) -> Optional[IliasPageElement]: + if self._is_forum_page(): + if "trows=800" in self._page_url: + return None + log.explain("Requesting *all* forum threads") + return self._get_show_max_forum_entries_per_page_url() if self._is_ilias_opencast_embedding(): log.explain("Unwrapping opencast embedding") return self.get_child_elements()[0] @@ -514,8 +222,6 @@ class IliasPage: if self._contains_collapsed_future_meetings(): log.explain("Requesting *all* future meetings") return self._uncollapse_future_meetings_url() - if self._is_exercise_not_all_shown(): - return self._show_all_exercises() if not self._is_content_tab_selected(): if self._page_type != IliasElementType.INFO_TAB: log.explain("Selecting content tab") @@ -524,6 +230,13 @@ class IliasPage: log.explain("Crawling info tab, skipping content select") return None + def _is_forum_page(self) -> bool: + read_more_btn = self._soup.find( + "button", + attrs={"onclick": lambda x: x and "cmdClass=ilobjforumgui&cmd=markAllRead" in x} + ) + return read_more_btn is not None + def _is_video_player(self) -> bool: return "paella_config_file" in str(self._soup) @@ -532,36 +245,38 @@ class IliasPage: return True # Raw listing without ILIAS fluff - video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) return video_element_table is not None def _is_ilias_opencast_embedding(self) -> bool: # ILIAS fluff around the real opencast html if self._soup.find(id="headerimage"): - element: Tag = cast(Tag, self._soup.find(id="headerimage")) - if "opencast" in cast(str, element.attrs["src"]).lower(): + element: Tag = self._soup.find(id="headerimage") + if "opencast" in element.attrs["src"].lower(): return True return False def _is_exercise_file(self) -> bool: # we know it from before - if self._page_type == IliasElementType.EXERCISE_OVERVIEW: + if self._page_type == IliasElementType.EXERCISE: return True # We have no suitable parent - let's guesss if self._soup.find(id="headerimage"): - element: Tag = cast(Tag, self._soup.find(id="headerimage")) - if "exc" in cast(str, element.attrs["src"]).lower(): + element: Tag = self._soup.find(id="headerimage") + if "exc" in element.attrs["src"].lower(): return True return False def _is_personal_desktop(self) -> bool: - return "baseclass=ildashboardgui" in self._page_url.lower() and "&cmd=show" in self._page_url.lower() + return self._soup.find("a", attrs={"href": lambda x: x and "block_type=pditems" in x}) def _is_content_page(self) -> bool: if link := self.get_permalink(): - return "/copa/" in link + return "target=copa_" in link return False def _is_learning_module_page(self) -> bool: @@ -575,22 +290,12 @@ class IliasPage: def _uncollapse_future_meetings_url(self) -> Optional[IliasPageElement]: element = self._soup.find( "a", - attrs={"href": lambda x: x is not None and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)}, + attrs={"href": lambda x: x and ("crs_next_sess=1" in x or "crs_prev_sess=1" in x)} ) if not element: return None link = self._abs_url_from_link(element) - return IliasPageElement.create_new(IliasElementType.FOLDER, link, "show all meetings") - - def _is_exercise_not_all_shown(self) -> bool: - return ( - self._page_type == IliasElementType.EXERCISE_OVERVIEW and "mode=all" not in self._page_url.lower() - ) - - def _show_all_exercises(self) -> Optional[IliasPageElement]: - return IliasPageElement.create_new( - IliasElementType.EXERCISE_OVERVIEW, self._page_url + "&mode=all", "show all exercises" - ) + return IliasPageElement(IliasElementType.FOLDER, link, "show all meetings") def _is_content_tab_selected(self) -> bool: return self._select_content_page_url() is None @@ -599,33 +304,33 @@ class IliasPage: might_be_info = self._soup.find("form", attrs={"name": lambda x: x == "formInfoScreen"}) is not None return self._page_type == IliasElementType.INFO_TAB and might_be_info - def _is_course_overview_page(self) -> bool: - return "baseClass=ilmembershipoverviewgui" in self._page_url - def _select_content_page_url(self) -> Optional[IliasPageElement]: tab = self._soup.find( - id="tab_view_content", attrs={"class": lambda x: x is not None and "active" not in x} + id="tab_view_content", + attrs={"class": lambda x: x is not None and "active" not in x} ) # Already selected (or not found) if not tab: return None link = tab.find("a") if link: - link_str = self._abs_url_from_link(link) - return IliasPageElement.create_new(IliasElementType.FOLDER, link_str, "select content page") + link = self._abs_url_from_link(link) + return IliasPageElement(IliasElementType.FOLDER, link, "select content page") _unexpected_html_warning() log.warn_contd(f"Could not find content tab URL on {self._page_url!r}.") log.warn_contd("PFERD might not find content on the course's main page.") return None - def _player_to_video(self) -> list[IliasPageElement]: + def _player_to_video(self) -> List[IliasPageElement]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source - regex = re.compile(r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE) + regex = re.compile( + r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE + ) json_match = regex.search(str(self._soup)) if json_match is None: @@ -640,120 +345,91 @@ class IliasPage: # and just fetch the lone video url! if len(streams) == 1: video_url = streams[0]["sources"]["mp4"][0]["src"] - return [ - IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name) - ] + return [IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, self._source_name)] log.explain(f"Found multiple videos for stream at {self._source_name}") items = [] for stream in sorted(streams, key=lambda stream: stream["content"]): full_name = f"{self._source_name.replace('.mp4', '')} ({stream['content']}).mp4" video_url = stream["sources"]["mp4"][0]["src"] - items.append(IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) + items.append(IliasPageElement(IliasElementType.OPENCAST_VIDEO, video_url, full_name)) return items - def _get_show_max_forum_entries_per_page_url( - self, wanted_max: Optional[int] = None - ) -> Optional[IliasPageElement]: + def _get_show_max_forum_entries_per_page_url(self) -> Optional[IliasPageElement]: correct_link = self._soup.find( - "a", attrs={"href": lambda x: x is not None and "trows=800" in x and "cmd=showThreads" in x} + "a", + attrs={"href": lambda x: x and "trows=800" in x and "cmd=showThreads" in x} ) if not correct_link: return None link = self._abs_url_from_link(correct_link) - if wanted_max is not None: - link = link.replace("trows=800", f"trows={wanted_max}") - return IliasPageElement.create_new(IliasElementType.FORUM, link, "show all forum threads") + return IliasPageElement(IliasElementType.FORUM, link, "show all forum threads") - def _get_forum_thread_count(self) -> Optional[int]: - log.explain_topic("Trying to find forum thread count") + def _find_personal_desktop_entries(self) -> List[IliasPageElement]: + items: List[IliasPageElement] = [] - candidates = cast(list[Tag], self._soup.select(".ilTableFootLight")) - extract_regex = re.compile(r"\s(?P\d+)\s*\)") - - for candidate in candidates: - log.explain(f"Found thread count candidate: {candidate}") - if match := extract_regex.search(candidate.get_text()): - return int(match.group("max")) - else: - log.explain("Found no candidates to extract thread count from") - - return None - - def _find_personal_desktop_entries(self) -> list[IliasPageElement]: - items: list[IliasPageElement] = [] - - titles: list[Tag] = self._soup.select("#block_pditems_0 .il-item-title") + titles: List[Tag] = self._soup.select(".il-item-title") for title in titles: link = title.find("a") - - if not link: - log.explain(f"Skipping offline item: {title.get_text().strip()!r}") - continue - - name = sanitize_path_name(link.text.strip()) + name = _sanitize_path_name(link.text.strip()) url = self._abs_url_from_link(link) - if "cmd=manage" in url and "cmdClass=ilPDSelectedItemsBlockGUI" in url: - # Configure button/link does not have anything interesting - continue - - typ = IliasPage._find_type_for_element( - name, url, lambda: IliasPage._find_icon_for_folder_entry(cast(Tag, link)) - ) - if not typ: + type = self._find_type_from_link(name, link, url) + if not type: _unexpected_html_warning() log.warn_contd(f"Could not extract type for {link}") continue - log.explain(f"Found {name!r} of type {typ}") + log.explain(f"Found {name!r}") - items.append(IliasPageElement.create_new(typ, url, name)) + if type == IliasElementType.FILE and "_download" not in url: + url = re.sub(r"(target=file_\d+)", r"\1_download", url) + log.explain("Rewired file URL to include download part") + + items.append(IliasPageElement(type, url, name)) return items - def _find_copa_entries(self) -> list[IliasPageElement]: - items: list[IliasPageElement] = [] - links: list[Tag] = cast(list[Tag], self._soup.find_all(class_="ilc_flist_a_FileListItemLink")) + def _find_copa_entries(self) -> List[IliasPageElement]: + items: List[IliasPageElement] = [] + links: List[Tag] = self._soup.findAll(class_="ilc_flist_a_FileListItemLink") for link in links: url = self._abs_url_from_link(link) - name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.get_text()).strip().replace("\t", "") - name = sanitize_path_name(name) + name = re.sub(r"\([\d,.]+ [MK]B\)", "", link.getText()).strip().replace("\t", "") + name = _sanitize_path_name(name) if "file_id" not in url: _unexpected_html_warning() log.warn_contd(f"Found unknown content page item {name!r} with url {url!r}") continue - items.append(IliasPageElement.create_new(IliasElementType.FILE, url, name)) + items.append(IliasPageElement(IliasElementType.FILE, url, name)) return items - def _find_info_tab_entries(self) -> list[IliasPageElement]: + def _find_info_tab_entries(self) -> List[IliasPageElement]: items = [] - links: list[Tag] = self._soup.select("a.il_ContainerItemCommand") + links: List[Tag] = self._soup.select("a.il_ContainerItemCommand") for link in links: - log.explain(f"Found info tab link: {self._abs_url_from_link(link)}") - if "cmdclass=ilobjcoursegui" not in cast(str, link["href"]).lower(): + if "cmdClass=ilobjcoursegui" not in link["href"]: continue - if "cmd=sendfile" not in cast(str, link["href"]).lower(): + if "cmd=sendfile" not in link["href"]: continue - items.append( - IliasPageElement.create_new( - IliasElementType.FILE, self._abs_url_from_link(link), sanitize_path_name(link.get_text()) - ) - ) + items.append(IliasPageElement( + IliasElementType.FILE, + self._abs_url_from_link(link), + _sanitize_path_name(link.getText()) + )) - log.explain(f"Found {len(items)} info tab entries {items}") return items - def _find_opencast_video_entries(self) -> list[IliasPageElement]: + def _find_opencast_video_entries(self) -> List[IliasPageElement]: # ILIAS has three stages for video pages # 1. The initial dummy page without any videos. This page contains the link to the listing # 2. The video listing which might be paginated @@ -761,58 +437,59 @@ class IliasPage: # # We need to figure out where we are. - video_element_table = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + video_element_table: Tag = self._soup.find( + name="table", id=re.compile(r"tbl_xoct_.+") + ) if video_element_table is None: # We are in stage 1 # The page is actually emtpy but contains the link to stage 2 - content_link: Tag = cast(Tag, self._soup.select_one("#tab_series a")) + content_link: Tag = self._soup.select_one("#tab_series a") url: str = self._abs_url_from_link(content_link) query_params = {"limit": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(url, query_params) log.explain("Found ILIAS video frame page, fetching actual content next") - return [ - IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "") - ] + return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER_MAYBE_PAGINATED, url, "")] is_paginated = self._soup.find(id=re.compile(r"tab_page_sel.+")) is not None - if is_paginated and self._page_type != IliasElementType.OPENCAST_VIDEO_FOLDER: + if is_paginated and not self._page_type == IliasElementType.OPENCAST_VIDEO_FOLDER: # We are in stage 2 - try to break pagination return self._find_opencast_video_entries_paginated() return self._find_opencast_video_entries_no_paging() - def _find_opencast_video_entries_paginated(self) -> list[IliasPageElement]: - table_element = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) + def _find_opencast_video_entries_paginated(self) -> List[IliasPageElement]: + table_element: Tag = self._soup.find(name="table", id=re.compile(r"tbl_xoct_.+")) if table_element is None: log.warn("Couldn't increase elements per page (table not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() - id_match = re.match(r"tbl_xoct_(.+)", cast(str, table_element.attrs["id"])) + id_match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) if id_match is None: log.warn("Couldn't increase elements per page (table id not found). I might miss elements.") return self._find_opencast_video_entries_no_paging() table_id = id_match.group(1) - query_params = {f"tbl_xoct_{table_id}_trows": "800", "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} + query_params = {f"tbl_xoct_{table_id}_trows": "800", + "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} url = url_set_query_params(self._page_url, query_params) log.explain("Disabled pagination, retrying folder as a new entry") - return [IliasPageElement.create_new(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] + return [IliasPageElement(IliasElementType.OPENCAST_VIDEO_FOLDER, url, "")] - def _find_opencast_video_entries_no_paging(self) -> list[IliasPageElement]: + def _find_opencast_video_entries_no_paging(self) -> List[IliasPageElement]: """ Crawls the "second stage" video page. This page contains the actual video urls. """ # Video start links are marked with an "Abspielen" link - video_links = cast( - list[Tag], self._soup.find_all(name="a", text=re.compile(r"\s*(Abspielen|Play)\s*")) + video_links: List[Tag] = self._soup.findAll( + name="a", text=re.compile(r"\s*(Abspielen|Play)\s*") ) - results: list[IliasPageElement] = [] + results: List[IliasPageElement] = [] for link in video_links: results.append(self._listed_opencast_video_to_element(link)) @@ -824,10 +501,12 @@ class IliasPage: # 6th or 7th child (1 indexed) is the modification time string. Try to find it # by parsing backwards from the end and finding something that looks like a date modification_time = None - row: Tag = link.parent.parent.parent # type: ignore + row: Tag = link.parent.parent.parent column_count = len(row.select("td.std")) for index in range(column_count, 0, -1): - modification_string = cast(Tag, row.select_one(f"td.std:nth-child({index})")).get_text().strip() + modification_string = link.parent.parent.parent.select_one( + f"td.std:nth-child({index})" + ).getText().strip() if match := re.search(r"\d+\.\d+.\d+ \d+:\d+", modification_string): modification_time = datetime.strptime(match.group(0), "%d.%m.%Y %H:%M") break @@ -836,166 +515,138 @@ class IliasPage: log.warn(f"Could not determine upload time for {link}") modification_time = datetime.now() - title = cast(Tag, row.select_one("td.std:nth-child(3)")).get_text().strip() + title = link.parent.parent.parent.select_one("td.std:nth-child(3)").getText().strip() title += ".mp4" - video_name: str = sanitize_path_name(title) + video_name: str = _sanitize_path_name(title) video_url = self._abs_url_from_link(link) log.explain(f"Found video {video_name!r} at {video_url}") - return IliasPageElement.create_new( + return IliasPageElement( IliasElementType.OPENCAST_VIDEO_PLAYER, video_url, video_name, modification_time ) - def _find_exercise_entries(self) -> list[IliasPageElement]: + def _find_exercise_entries(self) -> List[IliasPageElement]: if self._soup.find(id="tab_submission"): - log.explain("Found submission tab. This is an exercise detail or files page") - if self._soup.select_one("#tab_submission.active") is None: - log.explain(" This is a details page") - return self._find_exercise_entries_detail_page() - else: - log.explain(" This is a files page") - return self._find_exercise_entries_files_page() - + log.explain("Found submission tab. This is an exercise detail page") + return self._find_exercise_entries_detail_page() log.explain("Found no submission tab. This is an exercise root page") return self._find_exercise_entries_root_page() - def _find_exercise_entries_detail_page(self) -> list[IliasPageElement]: - results: list[IliasPageElement] = [] + def _find_exercise_entries_detail_page(self) -> List[IliasPageElement]: + results: List[IliasPageElement] = [] - if link := self._soup.select_one("#tab_submission > a"): - results.append( - IliasPageElement.create_new( - IliasElementType.EXERCISE_FILES, self._abs_url_from_link(link), "Submission" - ) - ) - else: - log.explain("Found no submission link for exercise, maybe it has not started yet?") - - # Find all download links in the container (this will contain all the *feedback* files) - download_links = cast( - list[Tag], - self._soup.find_all( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x is not None and "cmd=download" in x}, - text="Download", - ), + # Find all download links in the container (this will contain all the files) + download_links: List[Tag] = self._soup.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmd=download" in x}, + text="Download" ) for link in download_links: - parent_row: Tag = cast( - Tag, link.find_parent(attrs={"class": lambda x: x is not None and "row" in x}) - ) - name_tag = parent_row.find(name="div") + parent_row: Tag = link.findParent("tr") + children: List[Tag] = parent_row.findChildren("td") - if not name_tag: - log.warn("Could not find name tag for exercise entry") - _unexpected_html_warning() - continue - - name = sanitize_path_name(name_tag.get_text().strip()) + name = _sanitize_path_name(children[1].getText().strip()) log.explain(f"Found exercise detail entry {name!r}") - results.append( - IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name) - ) - - return results - - def _find_exercise_entries_files_page(self) -> list[IliasPageElement]: - results: list[IliasPageElement] = [] - - # Find all download links in the container - download_links = cast( - list[Tag], - self._soup.find_all( - name="a", - # download links contain the given command class - attrs={"href": lambda x: x is not None and "cmd=download" in x}, - text="Download", - ), - ) - - for link in download_links: - parent_row: Tag = cast(Tag, link.find_parent("tr")) - children = cast(list[Tag], parent_row.find_all("td")) - - name = sanitize_path_name(children[1].get_text().strip()) - log.explain(f"Found exercise file entry {name!r}") - - date = None for child in reversed(children): - date = demangle_date(child.get_text().strip(), fail_silently=True) + date = demangle_date(child.getText().strip(), fail_silently=True) if date is not None: break if date is None: - log.warn(f"Date parsing failed for exercise file entry {name!r}") + log.warn(f"Date parsing failed for exercise entry {name!r}") - results.append( - IliasPageElement.create_new(IliasElementType.FILE, self._abs_url_from_link(link), name, date) - ) + results.append(IliasPageElement( + IliasElementType.FILE, + self._abs_url_from_link(link), + name, + date + )) return results - def _find_exercise_entries_root_page(self) -> list[IliasPageElement]: - results: list[IliasPageElement] = [] + def _find_exercise_entries_root_page(self) -> List[IliasPageElement]: + results: List[IliasPageElement] = [] - content_tab = self._soup.find(id="ilContentContainer") - if not content_tab: - log.warn("Could not find content tab in exercise overview page") - _unexpected_html_warning() - return [] + # Each assignment is in an accordion container + assignment_containers: List[Tag] = self._soup.select(".il_VAccordionInnerContainer") - exercise_links = content_tab.select(".il-item-title a") + for container in assignment_containers: + # Fetch the container name out of the header to use it in the path + container_name = container.select_one(".ilAssignmentHeader").getText().strip() + log.explain(f"Found exercise container {container_name!r}") - for exercise in cast(list[Tag], exercise_links): - if "href" not in exercise.attrs: - continue - href = exercise.attrs["href"] - if type(href) is not str: - continue - if "ass_id=" not in href or "cmdclass=ilassignmentpresentationgui" not in href.lower(): - continue + # Find all download links in the container (this will contain all the files) + files: List[Tag] = container.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, + text="Download" + ) - name = sanitize_path_name(exercise.get_text().strip()) - results.append( - IliasPageElement.create_new( - IliasElementType.EXERCISE, self._abs_url_from_link(exercise), name + # Grab each file as you now have the link + for file_link in files: + # Two divs, side by side. Left is the name, right is the link ==> get left + # sibling + file_name = file_link.parent.findPrevious(name="div").getText().strip() + file_name = _sanitize_path_name(file_name) + url = self._abs_url_from_link(file_link) + + log.explain(f"Found exercise entry {file_name!r}") + results.append(IliasPageElement( + IliasElementType.FILE, + url, + container_name + "/" + file_name, + None # We do not have any timestamp + )) + + # Find all links to file listings (e.g. "Submitted Files" for groups) + file_listings: List[Tag] = container.findAll( + name="a", + # download links contain the given command class + attrs={"href": lambda x: x and "cmdclass=ilexsubmissionfilegui" in x.lower()} + ) + + # Add each listing as a new + for listing in file_listings: + parent_container: Tag = listing.findParent( + "div", attrs={"class": lambda x: x and "form-group" in x} ) - ) - - for result in results: - log.explain(f"Found exercise {result.name!r}") + label_container: Tag = parent_container.find( + attrs={"class": lambda x: x and "control-label" in x} + ) + file_name = _sanitize_path_name(label_container.getText().strip()) + url = self._abs_url_from_link(listing) + log.explain(f"Found exercise detail {file_name!r} at {url}") + results.append(IliasPageElement( + IliasElementType.EXERCISE_FILES, + url, + container_name + "/" + file_name, + None # we do not have any timestamp + )) return results - def _find_normal_entries(self) -> list[IliasPageElement]: - result: list[IliasPageElement] = [] + def _find_normal_entries(self) -> List[IliasPageElement]: + result: List[IliasPageElement] = [] - links: list[Tag] = [] # Fetch all links and throw them to the general interpreter - if self._is_course_overview_page(): - log.explain("Page is a course overview page, adjusting link selector") - links.extend(self._soup.select(".il-item-title > a")) - else: - links.extend(self._soup.select("a.il_ContainerItemTitle")) + links: List[Tag] = self._soup.select("a.il_ContainerItemTitle") for link in links: abs_url = self._abs_url_from_link(link) - # Make sure parents are sanitized. We do not want accidental parents - parents = [sanitize_path_name(x) for x in IliasPage._find_upwards_folder_hierarchy(link)] + parents = self._find_upwards_folder_hierarchy(link) if parents: - element_name = "/".join(parents) + "/" + sanitize_path_name(link.get_text()) + element_name = "/".join(parents) + "/" + _sanitize_path_name(link.getText()) else: - element_name = sanitize_path_name(link.get_text()) + element_name = _sanitize_path_name(link.getText()) - element_type = IliasPage._find_type_for_element( - element_name, abs_url, lambda: IliasPage._find_icon_for_folder_entry(link) - ) - description = IliasPage._find_link_description(link) + element_type = self._find_type_from_link(element_name, link, abs_url) + description = self._find_link_description(link) # The last meeting on every page is expanded by default. # Its content is then shown inline *and* in the meeting page itself. @@ -1005,113 +656,59 @@ class IliasPage: if not element_type: continue + if element_type == IliasElementType.MEETING: + normalized = _sanitize_path_name(self._normalize_meeting_name(element_name)) + log.explain(f"Normalized meeting name from {element_name!r} to {normalized!r}") + element_name = normalized elif element_type == IliasElementType.FILE: - result.append(IliasPage._file_to_element(element_name, abs_url, link)) + result.append(self._file_to_element(element_name, abs_url, link)) continue - log.explain(f"Found {element_name!r} of type {element_type}") - result.append( - IliasPageElement.create_new( - element_type, abs_url, element_name, description=description, skip_sanitize=True - ) - ) + log.explain(f"Found {element_name!r}") + result.append(IliasPageElement(element_type, abs_url, element_name, description=description)) result += self._find_cards() result += self._find_mediacast_videos() - result += self._find_mob_videos() return result - def _find_mediacast_videos(self) -> list[IliasPageElement]: - videos: list[IliasPageElement] = [] + def _find_mediacast_videos(self) -> List[IliasPageElement]: + videos: List[IliasPageElement] = [] - regex = re.compile(r"il\.VideoPlaylist\.init.+?\[(.+?)], ") - for script in cast(list[Tag], self._soup.find_all("script")): - for match in regex.finditer(script.text): - try: - playlist = json.loads("[" + match.group(1) + "]") - except json.JSONDecodeError: - log.warn("Could not decode playlist json") - log.warn_contd(f"Playlist json: [{match.group(1)}]") - continue - for elem in playlist: - title = elem.get("title", None) - description = elem.get("description", None) - url = elem.get("resource", None) - if title is None or description is None or url is None: - log.explain(f"Mediacast json: {match.group(1)}") - log.warn("Mediacast video json was not complete") - if title is None: - log.warn_contd("Missing title") - if description is None: - log.warn_contd("Missing description") - if url is None: - log.warn_contd("Missing URL") - - if not title.endswith(".mp4") and not title.endswith(".webm"): - # just to make sure it has some kinda-alrightish ending - title = title + ".mp4" - videos.append( - IliasPageElement.create_new( - typ=IliasElementType.MEDIACAST_VIDEO, - url=self._abs_url_from_relative(cast(str, url)), - name=sanitize_path_name(title), - ) - ) - - return videos - - def _find_mob_videos(self) -> list[IliasPageElement]: - videos: list[IliasPageElement] = [] - - selector = "figure.ilc_media_cont_MediaContainerHighlighted,figure.ilc_media_cont_MediaContainer" - for figure in self._soup.select(selector): - video_element = figure.select_one("video") - if not video_element: - continue - - url, title = self._find_mob_video_url_title(video_element, figure) - - if url is None: - _unexpected_html_warning() - log.warn_contd(f"No element found for mob video '{title}'") - continue - - if urlparse(url).hostname != urlparse(self._page_url).hostname: - log.explain(f"Found external video at {url}, ignoring") - continue - - videos.append( - IliasPageElement.create_new( - typ=IliasElementType.MOB_VIDEO, url=url, name=sanitize_path_name(title), mtime=None - ) + for elem in cast(List[Tag], self._soup.select(".ilPlayerPreviewOverlayOuter")): + element_name = _sanitize_path_name( + elem.select_one(".ilPlayerPreviewDescription").getText().strip() ) + if not element_name.endswith(".mp4"): + # just to make sure it has some kinda-alrightish ending + element_name = element_name + ".mp4" + video_element = elem.find(name="video") + if not video_element: + _unexpected_html_warning() + log.warn_contd(f"No