diff --git a/PFERD/cli/__init__.py b/PFERD/cli/__init__.py index c89f6f4..0be8c33 100644 --- a/PFERD/cli/__init__.py +++ b/PFERD/cli/__init__.py @@ -10,5 +10,6 @@ from . import command_local # noqa: F401 imported but unused from . import command_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused +from . import command_fau_ilias_web # noqa: F401 imported but unused from . import command_kit_ipd # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused diff --git a/PFERD/cli/command_fau_ilias_web.py b/PFERD/cli/command_fau_ilias_web.py new file mode 100644 index 0000000..7688783 --- /dev/null +++ b/PFERD/cli/command_fau_ilias_web.py @@ -0,0 +1,37 @@ +import argparse +import configparser + +from ..logging import log +from .common_ilias_args import configure_common_group_args, load_common +from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler + +COMMAND_NAME = "fau-ilias-web" + +SUBPARSER = SUBPARSERS.add_parser( + COMMAND_NAME, + parents=[CRAWLER_PARSER], +) + +GROUP = SUBPARSER.add_argument_group( + title=f"{COMMAND_NAME} crawler arguments", + description=f"arguments for the '{COMMAND_NAME}' crawler", +) + +configure_common_group_args(GROUP) + + +def load( + args: argparse.Namespace, + parser: configparser.ConfigParser, +) -> None: + log.explain(f"Creating config for command '{COMMAND_NAME}'") + + parser["crawl:ilias"] = {} + section = parser["crawl:ilias"] + load_crawler(args, section) + + section["type"] = COMMAND_NAME + load_common(section, args, parser) + + +SUBPARSER.set_defaults(command=load) diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 9a0e080..22028de 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -4,7 +4,7 @@ from typing import Callable, Dict from ..auth import Authenticator from ..config import Config from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 -from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection +from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection, FauIliasWebCrawler, FauIliasWebCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection @@ -22,6 +22,8 @@ CRAWLERS: Dict[str, CrawlerConstructor] = { IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), "kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), + "fau-ilias-web": lambda n, s, c, a: + FauIliasWebCrawler(n, FauIliasWebCrawlerSection(s), c, a), "kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c), } diff --git a/PFERD/crawl/ilias/__init__.py b/PFERD/crawl/ilias/__init__.py index 287bd3d..9f997e5 100644 --- a/PFERD/crawl/ilias/__init__.py +++ b/PFERD/crawl/ilias/__init__.py @@ -1,9 +1,12 @@ from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection) +from .fau_ilias_web_crawler import (FauIliasWebCrawler, FauIliasWebCrawlerSection) __all__ = [ "IliasWebCrawler", "IliasWebCrawlerSection", "KitIliasWebCrawler", "KitIliasWebCrawlerSection", + "FauIliasWebCrawler", + "FauIliasWebCrawlerSection", ] diff --git a/PFERD/crawl/ilias/fau_ilias_web_crawler.py b/PFERD/crawl/ilias/fau_ilias_web_crawler.py new file mode 100644 index 0000000..f26b2d1 --- /dev/null +++ b/PFERD/crawl/ilias/fau_ilias_web_crawler.py @@ -0,0 +1,35 @@ +from typing import Dict, Literal + +from ...auth import Authenticator +from ...config import Config +from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection +from .fau_shibboleth_login import FauShibbolethLogin + +_ILIAS_URL = "https://www.studon.fau.de/studon" + +class KitShibbolethBackgroundLoginSuccessful: + pass + +class FauIliasWebCrawlerSection(IliasWebCrawlerSection): + def base_url(self) -> str: + return _ILIAS_URL + + def login(self) -> Literal["shibboleth"]: + return "shibboleth" + + +class FauIliasWebCrawler(IliasWebCrawler): + def __init__( + self, + name: str, + section: FauIliasWebCrawlerSection, + config: Config, + authenticators: Dict[str, Authenticator], + ): + super().__init__(name, section, config, authenticators) + + self._shibboleth_login = FauShibbolethLogin( + _ILIAS_URL, + self._auth, + section.tfa_auth(authenticators), + ) diff --git a/PFERD/crawl/ilias/fau_shibboleth_login.py b/PFERD/crawl/ilias/fau_shibboleth_login.py new file mode 100644 index 0000000..50a54a7 --- /dev/null +++ b/PFERD/crawl/ilias/fau_shibboleth_login.py @@ -0,0 +1,136 @@ +""" +FAU-specific Shibboleth login helper. + +This module duplicates the original KIT-targeted Shibboleth login implementation +but exposes the same API so it can be swapped in where FAU-specific tweaks are +required. Keep behaviour identical to the original unless changes are needed. +""" +from typing import Any, Optional, cast + +import aiohttp +import yarl +from bs4 import BeautifulSoup, Tag + +from ...auth import Authenticator, TfaAuthenticator +from ...logging import log +from ...utils import soupify +from ..crawler import CrawlError + + +class FauShibbolethLogin: + """ + Login via shibboleth system for FAU. + """ + + def __init__( + self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator] + ) -> None: + self._ilias_url = ilias_url + self._auth = authenticator + self._tfa_auth = tfa_authenticator + + async def login(self, sess: aiohttp.ClientSession) -> None: + """ + Performs the ILIAS Shibboleth authentication dance and saves the login + cookies it receieves. + + This function should only be called whenever it is detected that you're + not logged in. The cookies obtained should be good for a few minutes, + maybe even an hour or two. + """ + + # Equivalent: Click on "Bei StudOn via Single Sign-On anmelden" button in + # https://www.studon.fau.de/studon/login.php + url = f"{self._ilias_url}/saml.php" + async with sess.get(url) as response: + shib_url = response.url + if str(shib_url).startswith(self._ilias_url): + log.explain( + "ILIAS recognized our shib token and logged us in in the background, returning" + ) + return + soup: BeautifulSoup = soupify(await response.read()) + + # Attempt to login using credentials, if necessary + while not self._login_successful(soup): + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = cast(Tag, soup.find("form", {"method": "post"})) + action = cast(str, form["action"]) + + # Equivalent: Enter credentials in + # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO + url = str(shib_url.origin()) + action + username, password = await self._auth.credentials() + data = { + "_eventId_proceed": "", + "j_username": username, + "j_password": password, + "fudis_web_authn_assertion_input": "", + } + if csrf_token_input := form.find("input", {"name": "csrf_token"}): + data["csrf_token"] = csrf_token_input["value"] # type: ignore + soup = await _post(sess, url, data) + + if soup.find(id="attributeRelease"): + raise CrawlError( + "ILIAS Shibboleth entitlements changed! " + "Please log in once in your browser and review them" + ) + + if self._tfa_required(soup): + soup = await self._authenticate_tfa(sess, soup, shib_url) + + if not self._login_successful(soup): + self._auth.invalidate_credentials() + + # Equivalent: Being redirected via JS automatically + # (or clicking "Continue" if you have JS disabled) + relay_state = cast(Tag, soup.find("input", {"name": "RelayState"})) + saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"})) + url = form = soup.find("form", {"method": "post"})["action"] # type: ignore + data = { # using the info obtained in the while loop above + "RelayState": cast(str, relay_state["value"]), + "SAMLResponse": cast(str, saml_response["value"]), + } + await sess.post(cast(str, url), data=data) + + async def _authenticate_tfa( + self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL + ) -> BeautifulSoup: + if not self._tfa_auth: + self._tfa_auth = TfaAuthenticator("ilias-anon-tfa") + + tfa_token = await self._tfa_auth.password() + + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = cast(Tag, soup.find("form", {"method": "post"})) + action = cast(str, form["action"]) + + # Equivalent: Enter token in + # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO + url = str(shib_url.origin()) + action + username, password = await self._auth.credentials() + data = { + "_eventId_proceed": "", + "fudis_otp_input": tfa_token, + } + if csrf_token_input := form.find("input", {"name": "csrf_token"}): + data["csrf_token"] = csrf_token_input["value"] # type: ignore + return await _post(session, url, data) + + @staticmethod + def _login_successful(soup: BeautifulSoup) -> bool: + relay_state = soup.find("input", {"name": "RelayState"}) + saml_response = soup.find("input", {"name": "SAMLResponse"}) + return relay_state is not None and saml_response is not None + + @staticmethod + def _tfa_required(soup: BeautifulSoup) -> bool: + return soup.find(id="fudiscr-form") is not None + + +async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup: + async with session.post(url, data=data) as response: + return soupify(await response.read())