duplicated files for integration of FAU login

This commit is contained in:
NIKL45 2025-10-18 23:43:57 +02:00
parent 207af51aa4
commit 4301dda48f
6 changed files with 215 additions and 1 deletions

View file

@ -10,5 +10,6 @@
from . import command_local # noqa: F401 imported but unused from . import command_local # noqa: F401 imported but unused
from . import command_ilias_web # noqa: F401 imported but unused from . import command_ilias_web # noqa: F401 imported but unused
from . import command_kit_ilias_web # noqa: F401 imported but unused from . import command_kit_ilias_web # noqa: F401 imported but unused
from . import command_fau_ilias_web # noqa: F401 imported but unused
from . import command_kit_ipd # noqa: F401 imported but unused from . import command_kit_ipd # noqa: F401 imported but unused
from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused from .parser import PARSER, ParserLoadError, load_default_section # noqa: F401 imported but unused

View file

@ -0,0 +1,37 @@
import argparse
import configparser
from ..logging import log
from .common_ilias_args import configure_common_group_args, load_common
from .parser import CRAWLER_PARSER, SUBPARSERS, load_crawler
COMMAND_NAME = "fau-ilias-web"
SUBPARSER = SUBPARSERS.add_parser(
COMMAND_NAME,
parents=[CRAWLER_PARSER],
)
GROUP = SUBPARSER.add_argument_group(
title=f"{COMMAND_NAME} crawler arguments",
description=f"arguments for the '{COMMAND_NAME}' crawler",
)
configure_common_group_args(GROUP)
def load(
args: argparse.Namespace,
parser: configparser.ConfigParser,
) -> None:
log.explain(f"Creating config for command '{COMMAND_NAME}'")
parser["crawl:ilias"] = {}
section = parser["crawl:ilias"]
load_crawler(args, section)
section["type"] = COMMAND_NAME
load_common(section, args, parser)
SUBPARSER.set_defaults(command=load)

View file

@ -4,7 +4,7 @@ from typing import Callable, Dict
from ..auth import Authenticator from ..auth import Authenticator
from ..config import Config from ..config import Config
from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection, FauIliasWebCrawler, FauIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection
@ -22,6 +22,8 @@ CRAWLERS: Dict[str, CrawlerConstructor] = {
IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a), IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
"kit-ilias-web": lambda n, s, c, a: "kit-ilias-web": lambda n, s, c, a:
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"fau-ilias-web": lambda n, s, c, a:
FauIliasWebCrawler(n, FauIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a: "kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c), KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
} }

View file

@ -1,9 +1,12 @@
from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, from .kit_ilias_web_crawler import (IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler,
KitIliasWebCrawlerSection) KitIliasWebCrawlerSection)
from .fau_ilias_web_crawler import (FauIliasWebCrawler, FauIliasWebCrawlerSection)
__all__ = [ __all__ = [
"IliasWebCrawler", "IliasWebCrawler",
"IliasWebCrawlerSection", "IliasWebCrawlerSection",
"KitIliasWebCrawler", "KitIliasWebCrawler",
"KitIliasWebCrawlerSection", "KitIliasWebCrawlerSection",
"FauIliasWebCrawler",
"FauIliasWebCrawlerSection",
] ]

View file

@ -0,0 +1,35 @@
from typing import Dict, Literal
from ...auth import Authenticator
from ...config import Config
from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection
from .fau_shibboleth_login import FauShibbolethLogin
_ILIAS_URL = "https://www.studon.fau.de/studon"
class KitShibbolethBackgroundLoginSuccessful:
pass
class FauIliasWebCrawlerSection(IliasWebCrawlerSection):
def base_url(self) -> str:
return _ILIAS_URL
def login(self) -> Literal["shibboleth"]:
return "shibboleth"
class FauIliasWebCrawler(IliasWebCrawler):
def __init__(
self,
name: str,
section: FauIliasWebCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator],
):
super().__init__(name, section, config, authenticators)
self._shibboleth_login = FauShibbolethLogin(
_ILIAS_URL,
self._auth,
section.tfa_auth(authenticators),
)

View file

@ -0,0 +1,136 @@
"""
FAU-specific Shibboleth login helper.
This module duplicates the original KIT-targeted Shibboleth login implementation
but exposes the same API so it can be swapped in where FAU-specific tweaks are
required. Keep behaviour identical to the original unless changes are needed.
"""
from typing import Any, Optional, cast
import aiohttp
import yarl
from bs4 import BeautifulSoup, Tag
from ...auth import Authenticator, TfaAuthenticator
from ...logging import log
from ...utils import soupify
from ..crawler import CrawlError
class FauShibbolethLogin:
"""
Login via shibboleth system for FAU.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Equivalent: Click on "Bei StudOn via Single Sign-On anmelden" button in
# https://www.studon.fau.de/studon/login.php
url = f"{self._ilias_url}/saml.php"
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith(self._ilias_url):
log.explain(
"ILIAS recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"fudis_web_authn_assertion_input": "",
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),
}
await sess.post(cast(str, url), data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"fudis_otp_input": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="fudiscr-form") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())