From 77c6e7d8168b9530e6fe88b9ca908ef1dc9e301c Mon Sep 17 00:00:00 2001 From: NIKL45 Date: Fri, 24 Oct 2025 21:25:07 +0200 Subject: [PATCH] created class SimpleSAMLLogin by duplicating the class ShibbolethLogin and making the two changes mentioned in Garmelon/PFERD/issues/126. TFA not tested yet. --- PFERD/crawl/ilias/simplesaml_login.py | 121 ++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 PFERD/crawl/ilias/simplesaml_login.py diff --git a/PFERD/crawl/ilias/simplesaml_login.py b/PFERD/crawl/ilias/simplesaml_login.py new file mode 100644 index 0000000..d6a629b --- /dev/null +++ b/PFERD/crawl/ilias/simplesaml_login.py @@ -0,0 +1,121 @@ +from typing import Any, Optional, cast + +import aiohttp +import yarl +from bs4 import BeautifulSoup, Tag + +from ...auth import Authenticator, TfaAuthenticator +from ...logging import log +from ...utils import soupify +from ..crawler import CrawlError + + +class SimpleSAMLLogin: + """ + Login via a SimpleSAML system. + + It performs a basic authentication by following the login redirect + and posting credentials to the indicated form. It also supports TFA similar to Shibboleth. + """ + + def __init__( + self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator] + ) -> None: + self._ilias_url = ilias_url + self._auth = authenticator + self._tfa_auth = tfa_authenticator + + async def login(self, sess: aiohttp.ClientSession) -> None: + """ + Perform a SimpleSAML login flow and populate the session cookies. + """ + + # Start at the local login entrypoint which may redirect to SimpleSAML + url = f"{self._ilias_url}/saml.php" + async with sess.get(url) as response: + saml_url = response.url + # If the redirect stayed on the ILIAS host, assume we're already logged in + if str(saml_url).startswith(self._ilias_url): + log.explain("ILIAS recognized our simple-saml token and logged us in in the background, returning") + return + soup: BeautifulSoup = soupify(await response.read()) + + # The SimpleSAML login page uses a form POST similar to Shibboleth. + # Attempt to login using credentials. + while not self._login_successful(soup): + form = cast(Tag, soup.find("form", {"method": "post"})) + action = cast(str, form["action"]) + if action.startswith("https"): # FAU uses full URL here + url = action + else: + url = str(saml_url.origin()) + action #KIT uses relative URL here + + username, password = await self._auth.credentials() + data = { + "username": username, + "password": password, + } + if csrf_token_input := form.find("input", {"name": "csrf_token"}): + data["csrf_token"] = csrf_token_input["value"] # type: ignore + + soup = await _post(sess, url, data) + + # Detect attribute release prompt + if soup.find(id="attributeRelease"): + raise CrawlError( + "ILIAS SimpleSAML entitlements changed! Please log in once in your browser and review them" + ) + + if self._tfa_required(soup): + soup = await self._authenticate_tfa(sess, soup, saml_url) + + if not self._login_successful(soup): + self._auth.invalidate_credentials() + + # Equivalent: Being redirected via JS automatically + # (or clicking "Continue" if you have JS disabled) + relay_state = cast(Tag, soup.find("input", {"name": "RelayState"})) + saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"})) + url = cast(str, cast(Tag, soup.find("form", {"method": "post"}))["action"]) + data = { # using the info obtained in the while loop above + "RelayState": cast(str, relay_state["value"]), + "SAMLResponse": cast(str, saml_response["value"]), + } + await sess.post(cast(str, url), data=data) + + async def _authenticate_tfa( + self, session: aiohttp.ClientSession, soup: BeautifulSoup, saml_url: yarl.URL + ) -> BeautifulSoup: + if not self._tfa_auth: + self._tfa_auth = TfaAuthenticator("ilias-anon-tfa") + + tfa_token = await self._tfa_auth.password() + + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = cast(Tag, soup.find("form", {"method": "post"})) + action = cast(str, form["action"]) + + url = str(saml_url.origin()) + action + data = { + "_eventId_proceed": "", + "fudis_otp_input": tfa_token, + } + if csrf_token_input := form.find("input", {"name": "csrf_token"}): + data["csrf_token"] = csrf_token_input["value"] # type: ignore + return await _post(session, url, data) + + @staticmethod + def _login_successful(soup: BeautifulSoup) -> bool: + relay_state = soup.find("input", {"name": "RelayState"}) + saml_response = soup.find("input", {"name": "SAMLResponse"}) + return relay_state is not None and saml_response is not None + + @staticmethod + def _tfa_required(soup: BeautifulSoup) -> bool: + return soup.find(id="fudiscr-form") is not None + + +async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup: + async with session.post(url, data=data) as response: + return soupify(await response.read())