Add shibboleth config option for non KIT

Co-authored-by: MrPine <git@mr-pine.de>
This commit is contained in:
PinieP 2024-11-02 15:13:40 +01:00 committed by I-Al-Istannen
parent d27640b3f2
commit 1f417ec87d
3 changed files with 151 additions and 94 deletions

View file

@ -23,10 +23,20 @@ from .file_templates import Links, learning_module_template
from .ilias_html_cleaner import clean, insert_base_markup from .ilias_html_cleaner import clean, insert_base_markup
from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage, from .kit_ilias_html import (IliasElementType, IliasForumThread, IliasLearningModulePage, IliasPage,
IliasPageElement, _sanitize_path_name, parse_ilias_forum_export) IliasPageElement, _sanitize_path_name, parse_ilias_forum_export)
from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int] TargetType = Union[str, int]
class ShibbolethLoginType():
pass
class LocalLoginType():
def __init__(self, client_id: str):
self.client_id = client_id
class IliasWebCrawlerSection(HttpCrawlerSection): class IliasWebCrawlerSection(HttpCrawlerSection):
def base_url(self) -> str: def base_url(self) -> str:
base_url = self.s.get("base_url") base_url = self.s.get("base_url")
@ -35,12 +45,32 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
return base_url return base_url
def client_id(self) -> str: def login(self) -> Union[ShibbolethLoginType, LocalLoginType]:
login_type = self.s.get("login_type")
if not login_type:
self.missing_value("login_type")
if login_type == "shibboleth":
return ShibbolethLoginType()
elif login_type == "local":
client_id = self.s.get("client_id") client_id = self.s.get("client_id")
if not client_id: if not client_id:
self.missing_value("client_id") self.missing_value("client_id")
return LocalLoginType(client_id)
return client_id self.invalid_value("login_type", login_type,
"Should be <shibboleth | local>")
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value,
"No such auth section exists")
return auth
def target(self) -> TargetType: def target(self) -> TargetType:
target = self.s.get("target") target = self.s.get("target")
@ -57,7 +87,8 @@ class IliasWebCrawlerSection(HttpCrawlerSection):
# URL # URL
return target return target
self.invalid_value("target", target, "Should be <course id | desktop | kit ilias URL>") self.invalid_value(
"target", target, "Should be <course id | desktop | kit ilias URL>")
def links(self) -> Links: def links(self) -> Links:
type_str: Optional[str] = self.s.get("links") type_str: Optional[str] = self.s.get("links")
@ -156,7 +187,14 @@ instance's greatest bottleneck.
self._auth = auth self._auth = auth
self._base_url = section.base_url() self._base_url = section.base_url()
self._client_id = section.client_id() self._tfa_auth = section.tfa_auth(authenticators)
self._login_type = section.login()
if isinstance(self._login_type, LocalLoginType):
self._client_id = self._login_type.client_id
else:
self._shibboleth_login = ShibbolethLogin(
self._base_url, self._auth, self._tfa_auth)
self._target = section.target() self._target = section.target()
self._link_file_redirect_delay = section.link_redirect_delay() self._link_file_redirect_delay = section.link_redirect_delay()
@ -167,7 +205,8 @@ instance's greatest bottleneck.
async def _run(self) -> None: async def _run(self) -> None:
if isinstance(self._target, int): if isinstance(self._target, int):
log.explain_topic(f"Inferred crawl target: Course with id {self._target}") log.explain_topic(
f"Inferred crawl target: Course with id {self._target}")
await self._crawl_course(self._target) await self._crawl_course(self._target)
elif self._target == "desktop": elif self._target == "desktop":
log.explain_topic("Inferred crawl target: Personal desktop") log.explain_topic("Inferred crawl target: Personal desktop")
@ -230,7 +269,8 @@ instance's greatest bottleneck.
while next_stage_url: while next_stage_url:
soup = await self._get_page(next_stage_url) soup = await self._get_page(next_stage_url)
log.explain_topic(f"Parsing HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing HTML page for {
fmt_path(cl.path)}")
log.explain(f"URL: {next_stage_url}") log.explain(f"URL: {next_stage_url}")
# If we expect to find a root course, enforce it # If we expect to find a root course, enforce it
@ -366,7 +406,8 @@ instance's greatest bottleneck.
return None return None
else: else:
log.explain("Answer: Yes") log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension) element_path = element_path.with_name(
element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime) maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl: if not maybe_dl:
@ -378,9 +419,11 @@ instance's greatest bottleneck.
@_iorepeat(3, "resolving link") @_iorepeat(3, "resolving link")
async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None: async def _download_link(self, element: IliasPageElement, link_template: str, dl: DownloadToken) -> None:
async with dl as (bar, sink): async with dl as (bar, sink):
export_url = element.url.replace("cmd=calldirectlink", "cmd=exportHTML") export_url = element.url.replace(
"cmd=calldirectlink", "cmd=exportHTML")
real_url = await self._resolve_link_target(export_url) real_url = await self._resolve_link_target(export_url)
self._write_link_content(link_template, real_url, element.name, element.description, sink) self._write_link_content(
link_template, real_url, element.name, element.description, sink)
def _write_link_content( def _write_link_content(
self, self,
@ -394,7 +437,8 @@ instance's greatest bottleneck.
content = content.replace("{{link}}", url) content = content.replace("{{link}}", url)
content = content.replace("{{name}}", name) content = content.replace("{{name}}", name)
content = content.replace("{{description}}", str(description)) content = content.replace("{{description}}", str(description))
content = content.replace("{{redirect_delay}}", str(self._link_file_redirect_delay)) content = content.replace(
"{{redirect_delay}}", str(self._link_file_redirect_delay))
sink.file.write(content.encode("utf-8")) sink.file.write(content.encode("utf-8"))
sink.done() sink.done()
@ -403,7 +447,8 @@ instance's greatest bottleneck.
element: IliasPageElement, element: IliasPageElement,
element_path: PurePath, element_path: PurePath,
) -> Optional[Coroutine[Any, Any, None]]: ) -> Optional[Coroutine[Any, Any, None]]:
log.explain_topic(f"Decision: Crawl Booking Link {fmt_path(element_path)}") log.explain_topic(f"Decision: Crawl Booking Link {
fmt_path(element_path)}")
log.explain(f"Links type is {self._links}") log.explain(f"Links type is {self._links}")
link_template_maybe = self._links.template() link_template_maybe = self._links.template()
@ -413,7 +458,8 @@ instance's greatest bottleneck.
return None return None
else: else:
log.explain("Answer: Yes") log.explain("Answer: Yes")
element_path = element_path.with_name(element_path.name + link_extension) element_path = element_path.with_name(
element_path.name + link_extension)
maybe_dl = await self.download(element_path, mtime=element.mtime) maybe_dl = await self.download(element_path, mtime=element.mtime)
if not maybe_dl: if not maybe_dl:
@ -446,7 +492,8 @@ instance's greatest bottleneck.
dl: DownloadToken, dl: DownloadToken,
) -> None: ) -> None:
async with dl as (bar, sink): async with dl as (bar, sink):
self._write_link_content(link_template, element.url, element.name, element.description, sink) self._write_link_content(
link_template, element.url, element.name, element.description, sink)
async def _resolve_link_target(self, export_url: str) -> str: async def _resolve_link_target(self, export_url: str) -> str:
async def impl() -> Optional[str]: async def impl() -> Optional[str]:
@ -470,7 +517,8 @@ instance's greatest bottleneck.
if target is not None: if target is not None:
return target return target
raise CrawlError("resolve_link_target failed even after authenticating") raise CrawlError(
"resolve_link_target failed even after authenticating")
async def _handle_opencast_video( async def _handle_opencast_video(
self, self,
@ -481,7 +529,8 @@ instance's greatest bottleneck.
if self.prev_report: if self.prev_report:
self.report.add_custom_value( self.report.add_custom_value(
_get_video_cache_key(element), _get_video_cache_key(element),
self.prev_report.get_custom_value(_get_video_cache_key(element)) self.prev_report.get_custom_value(
_get_video_cache_key(element))
) )
# A video might contain other videos, so let's "crawl" the video first # A video might contain other videos, so let's "crawl" the video first
@ -502,7 +551,8 @@ instance's greatest bottleneck.
# Mark all existing videos as known to ensure they do not get deleted during cleanup. # Mark all existing videos as known to ensure they do not get deleted during cleanup.
# We "downloaded" them, just without actually making a network request as we assumed # We "downloaded" them, just without actually making a network request as we assumed
# they did not change. # they did not change.
contained = self._previous_contained_opencast_videos(element, maybe_dl.path) contained = self._previous_contained_opencast_videos(
element, maybe_dl.path)
if len(contained) > 1: if len(contained) > 1:
# Only do this if we threw away the original dl token, # Only do this if we threw away the original dl token,
# to not download single-stream videos twice # to not download single-stream videos twice
@ -518,31 +568,38 @@ instance's greatest bottleneck.
) -> List[PurePath]: ) -> List[PurePath]:
if not self.prev_report: if not self.prev_report:
return [] return []
custom_value = self.prev_report.get_custom_value(_get_video_cache_key(element)) custom_value = self.prev_report.get_custom_value(
_get_video_cache_key(element))
if not custom_value: if not custom_value:
return [] return []
cached_value = cast(dict[str, Any], custom_value) cached_value = cast(dict[str, Any], custom_value)
if "known_paths" not in cached_value or "own_path" not in cached_value: if "known_paths" not in cached_value or "own_path" not in cached_value:
log.explain(f"'known_paths' or 'own_path' missing from cached value: {cached_value}") log.explain(f"'known_paths' or 'own_path' missing from cached value: {
cached_value}")
return [] return []
transformed_own_path = self._transformer.transform(element_path) transformed_own_path = self._transformer.transform(element_path)
if cached_value["own_path"] != str(transformed_own_path): if cached_value["own_path"] != str(transformed_own_path):
log.explain( log.explain(
f"own_path '{transformed_own_path}' does not match cached value: '{cached_value['own_path']}" f"own_path '{transformed_own_path}' does not match cached value: '{
cached_value['own_path']}"
) )
return [] return []
return [PurePath(name) for name in cached_value["known_paths"]] return [PurePath(name) for name in cached_value["known_paths"]]
def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool: def _all_opencast_videos_locally_present(self, element: IliasPageElement, element_path: PurePath) -> bool:
log.explain_topic(f"Checking local cache for video {fmt_path(element_path)}") log.explain_topic(f"Checking local cache for video {
fmt_path(element_path)}")
if contained_videos := self._previous_contained_opencast_videos(element, element_path): if contained_videos := self._previous_contained_opencast_videos(element, element_path):
log.explain( log.explain(
f"The following contained videos are known: {','.join(map(fmt_path, contained_videos))}" f"The following contained videos are known: {
','.join(map(fmt_path, contained_videos))}"
) )
if all(self._output_dir.resolve(path).exists() for path in contained_videos): if all(self._output_dir.resolve(path).exists() for path in contained_videos):
log.explain("Found all known videos locally, skipping enumeration request") log.explain(
"Found all known videos locally, skipping enumeration request")
return True return True
log.explain("Missing at least one video, continuing with requests!") log.explain(
"Missing at least one video, continuing with requests!")
else: else:
log.explain("No local cache present") log.explain("No local cache present")
return False return False
@ -553,7 +610,8 @@ instance's greatest bottleneck.
def add_to_report(paths: list[str]) -> None: def add_to_report(paths: list[str]) -> None:
self.report.add_custom_value( self.report.add_custom_value(
_get_video_cache_key(element), _get_video_cache_key(element),
{"known_paths": paths, "own_path": str(self._transformer.transform(dl.path))} {"known_paths": paths, "own_path": str(
self._transformer.transform(dl.path))}
) )
async with dl as (bar, sink): async with dl as (bar, sink):
@ -580,8 +638,10 @@ instance's greatest bottleneck.
if not maybe_dl: if not maybe_dl:
continue continue
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
log.explain(f"Streaming video from real url {stream_element.url}") log.explain(f"Streaming video from real url {
contained_video_paths.append(str(self._transformer.transform(maybe_dl.path))) stream_element.url}")
contained_video_paths.append(
str(self._transformer.transform(maybe_dl.path)))
await self._stream_from_url(stream_element.url, sink, bar, is_video=True) await self._stream_from_url(stream_element.url, sink, bar, is_video=True)
add_to_report(contained_video_paths) add_to_report(contained_video_paths)
@ -701,7 +761,8 @@ instance's greatest bottleneck.
tasks: List[Awaitable[None]] = [] tasks: List[Awaitable[None]] = []
for elem in elements: for elem in elements:
tasks.append(asyncio.create_task(self._download_forum_thread(cl.path, elem))) tasks.append(asyncio.create_task(
self._download_forum_thread(cl.path, elem)))
# And execute them # And execute them
await self.gather(tasks) await self.gather(tasks)
@ -742,7 +803,8 @@ instance's greatest bottleneck.
elements: List[IliasLearningModulePage] = [] elements: List[IliasLearningModulePage] = []
async with cl: async with cl:
log.explain_topic(f"Parsing initial HTML page for {fmt_path(cl.path)}") log.explain_topic(f"Parsing initial HTML page for {
fmt_path(cl.path)}")
log.explain(f"URL: {element.url}") log.explain(f"URL: {element.url}")
soup = await self._get_page(element.url) soup = await self._get_page(element.url)
page = IliasPage(soup, element.url, element) page = IliasPage(soup, element.url, element)
@ -762,9 +824,11 @@ instance's greatest bottleneck.
tasks: List[Awaitable[None]] = [] tasks: List[Awaitable[None]] = []
for index, elem in enumerate(elements): for index, elem in enumerate(elements):
prev_url = elements[index - 1].title if index > 0 else None prev_url = elements[index - 1].title if index > 0 else None
next_url = elements[index + 1].title if index < len(elements) - 1 else None next_url = elements[index +
1].title if index < len(elements) - 1 else None
tasks.append(asyncio.create_task( tasks.append(asyncio.create_task(
self._download_learning_module_page(cl.path, elem, prev_url, next_url) self._download_learning_module_page(
cl.path, elem, prev_url, next_url)
)) ))
# And execute them # And execute them
@ -785,7 +849,8 @@ instance's greatest bottleneck.
next_element_url: Optional[str] = start_url next_element_url: Optional[str] = start_url
counter = 0 counter = 0
while next_element_url: while next_element_url:
log.explain_topic(f"Parsing HTML page for {fmt_path(path)} ({dir}-{counter})") log.explain_topic(f"Parsing HTML page for {
fmt_path(path)} ({dir}-{counter})")
log.explain(f"URL: {next_element_url}") log.explain(f"URL: {next_element_url}")
soup = await self._get_page(next_element_url) soup = await self._get_page(next_element_url)
page = IliasPage(soup, next_element_url, parent_element) page = IliasPage(soup, next_element_url, parent_element)
@ -817,13 +882,15 @@ instance's greatest bottleneck.
return return
if prev: if prev:
prev_p = self._transformer.transform(parent_path / (_sanitize_path_name(prev) + ".html")) prev_p = self._transformer.transform(
parent_path / (_sanitize_path_name(prev) + ".html"))
if prev_p: if prev_p:
prev = os.path.relpath(prev_p, my_path.parent) prev = os.path.relpath(prev_p, my_path.parent)
else: else:
prev = None prev = None
if next: if next:
next_p = self._transformer.transform(parent_path / (_sanitize_path_name(next) + ".html")) next_p = self._transformer.transform(
parent_path / (_sanitize_path_name(next) + ".html"))
if next_p: if next_p:
next = os.path.relpath(next_p, my_path.parent) next = os.path.relpath(next_p, my_path.parent)
else: else:
@ -832,7 +899,8 @@ instance's greatest bottleneck.
async with maybe_dl as (bar, sink): async with maybe_dl as (bar, sink):
content = element.content content = element.content
content = await self.internalize_images(content) content = await self.internalize_images(content)
sink.file.write(learning_module_template(content, maybe_dl.path.name, prev, next).encode("utf-8")) sink.file.write(learning_module_template(
content, maybe_dl.path.name, prev, next).encode("utf-8"))
sink.done() sink.done()
async def internalize_images(self, tag: Tag) -> Tag: async def internalize_images(self, tag: Tag) -> Tag:
@ -850,7 +918,8 @@ instance's greatest bottleneck.
continue continue
log.explain(f"Internalizing {url!r}") log.explain(f"Internalizing {url!r}")
img = await self._get_authenticated(url) img = await self._get_authenticated(url)
elem.attrs["src"] = "data:;base64," + base64.b64encode(img).decode() elem.attrs["src"] = "data:;base64," + \
base64.b64encode(img).decode()
if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"): if elem.name == "iframe" and elem.attrs.get("src", "").startswith("//"):
# For unknown reasons the protocol seems to be stripped. # For unknown reasons the protocol seems to be stripped.
elem.attrs["src"] = "https:" + elem.attrs["src"] elem.attrs["src"] = "https:" + elem.attrs["src"]
@ -880,7 +949,8 @@ instance's greatest bottleneck.
soup = soupify(await request.read()) soup = soupify(await request.read())
if IliasPage.is_logged_in(soup): if IliasPage.is_logged_in(soup):
return self._verify_page(soup, url, root_page_allowed) return self._verify_page(soup, url, root_page_allowed)
raise CrawlError(f"get_page failed even after authenticating on {url!r}") raise CrawlError(
f"get_page failed even after authenticating on {url!r}")
@staticmethod @staticmethod
def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup: def _verify_page(soup: BeautifulSoup, url: str, root_page_allowed: bool) -> BeautifulSoup:
@ -939,6 +1009,9 @@ instance's greatest bottleneck.
@_iorepeat(3, "Login", failure_is_error=True) @_iorepeat(3, "Login", failure_is_error=True)
async def _authenticate(self) -> None: async def _authenticate(self) -> None:
# fill the session with the correct cookies # fill the session with the correct cookies
if isinstance(self._login_type, ShibbolethLoginType):
await self._shibboleth_login.login(self.session)
else:
params = { params = {
"client_id": self._client_id, "client_id": self._client_id,
"cmd": "force_login", "cmd": "force_login",
@ -948,11 +1021,13 @@ instance's greatest bottleneck.
login_form = login_page.find("form", attrs={"name": "formlogin"}) login_form = login_page.find("form", attrs={"name": "formlogin"})
if login_form is None: if login_form is None:
raise CrawlError("Could not find the login form! Specified client id might be invalid.") raise CrawlError(
"Could not find the login form! Specified client id might be invalid.")
login_url = login_form.attrs.get("action") login_url = login_form.attrs.get("action")
if login_url is None: if login_url is None:
raise CrawlError("Could not find the action URL in the login form!") raise CrawlError(
"Could not find the action URL in the login form!")
username, password = await self._auth.credentials() username, password = await self._auth.credentials()
@ -973,7 +1048,8 @@ instance's greatest bottleneck.
# Normal ILIAS pages # Normal ILIAS pages
mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar") mainbar: Optional[Tag] = soup.find(class_="il-maincontrols-metabar")
if mainbar is not None: if mainbar is not None:
login_button = mainbar.find(attrs={"href": lambda x: x and "login.php" in x}) login_button = mainbar.find(
attrs={"href": lambda x: x and "login.php" in x})
shib_login = soup.find(id="button_shib_login") shib_login = soup.find(id="button_shib_login")
return not login_button and not shib_login return not login_button and not shib_login

View file

@ -1,8 +1,5 @@
from typing import Any, Dict, Optional, Union from typing import Any, Dict, Optional, Union
import aiohttp
import yarl
from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator from ...auth import Authenticator, TfaAuthenticator
from ...config import Config from ...config import Config
@ -10,12 +7,13 @@ from ...logging import log
from ...utils import soupify from ...utils import soupify
from ..crawler import CrawlError, CrawlWarning from ..crawler import CrawlError, CrawlWarning
from .async_helper import _iorepeat from .async_helper import _iorepeat
from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection from .ilias_web_crawler import IliasWebCrawler, IliasWebCrawlerSection, ShibbolethLoginType
from .shibboleth_login import ShibbolethLogin from .shibboleth_login import ShibbolethLogin
TargetType = Union[str, int] TargetType = Union[str, int]
_ILIAS_URL = "https://ilias.studium.kit.edu" # _ILIAS_URL = "https://ilias.studium.kit.edu"
_ILIAS_URL = "https://ovidius.uni-tuebingen.de/ilias3"
class KitShibbolethBackgroundLoginSuccessful: class KitShibbolethBackgroundLoginSuccessful:
@ -26,22 +24,8 @@ class KitIliasWebCrawlerSection(IliasWebCrawlerSection):
def base_url(self) -> str: def base_url(self) -> str:
return _ILIAS_URL return _ILIAS_URL
def client_id(self) -> str: def login(self) -> ShibbolethLoginType:
# KIT ILIAS uses the Shibboleth service for authentication. There's no return ShibbolethLoginType()
# use for a client id.
return "unused"
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value,
"No such auth section exists")
return auth
class KitIliasWebCrawler(IliasWebCrawler): class KitIliasWebCrawler(IliasWebCrawler):

View file

@ -1,6 +1,7 @@
from typing import Any, Optional from typing import Any, Optional
import aiohttp import aiohttp
import yarl
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from ...auth import Authenticator, TfaAuthenticator from ...auth import Authenticator, TfaAuthenticator
@ -9,10 +10,6 @@ from ...utils import soupify
from ..crawler import CrawlError from ..crawler import CrawlError
class ShibbolethBackgroundLoginSuccessful:
pass
class ShibbolethLogin: class ShibbolethLogin:
""" """
Login via shibboleth system. Login via shibboleth system.
@ -59,7 +56,6 @@ class ShibbolethLogin:
# Equivalent: Enter credentials in # Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action url = str(shib_url.origin()) + action
log.print(f"{url=}")
username, password = await self._auth.credentials() username, password = await self._auth.credentials()
data = { data = {
"_eventId_proceed": "", "_eventId_proceed": "",
@ -77,7 +73,7 @@ class ShibbolethLogin:
) )
if self._tfa_required(soup): if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup) soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup): if not self._login_successful(soup):
self._auth.invalidate_credentials() self._auth.invalidate_credentials()
@ -94,7 +90,7 @@ class ShibbolethLogin:
await sess.post(url, data=data) await sess.post(url, data=data)
async def _authenticate_tfa( async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup: ) -> BeautifulSoup:
if not self._tfa_auth: if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa") self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
@ -105,16 +101,17 @@ class ShibbolethLogin:
# credentials rather than after asking. # credentials rather than after asking.
form = soup.find("form", {"method": "post"}) form = soup.find("form", {"method": "post"})
action = form["action"] action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
# Equivalent: Enter token in # Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = "https://idp.scc.kit.edu" + action url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = { data = {
"_eventId_proceed": "", "_eventId_proceed": "",
"j_tokenNumber": tfa_token, "j_tokenNumber": tfa_token,
"csrf_token": csrf_token,
} }
if crsf_token_input := form.find("input", {"name": "csrf_token"}):
data["crsf_token"] = crsf_token_input["value"]
return await _post(session, url, data) return await _post(session, url, data)
@staticmethod @staticmethod