From b2a2b5999bd38abfebfcc8ee3d48dcd90ccb59b6 Mon Sep 17 00:00:00 2001 From: I-Al-Istannen Date: Sat, 15 May 2021 15:18:51 +0200 Subject: [PATCH] Implement ILIAS auth and crawl home page This commit introduces the necessary machinery to authenticate with ILIAS and crawl the home page. It can't do much yet and just silently fetches the homepage. --- PFERD/crawlers/__init__.py | 3 + PFERD/crawlers/ilias.py | 209 +++++++++++++++++++++++++++++++++++++ PFERD/utils.py | 8 ++ setup.cfg | 1 + 4 files changed, 221 insertions(+) create mode 100644 PFERD/crawlers/ilias.py diff --git a/PFERD/crawlers/__init__.py b/PFERD/crawlers/__init__.py index b2e5af5..0ae2ca3 100644 --- a/PFERD/crawlers/__init__.py +++ b/PFERD/crawlers/__init__.py @@ -5,6 +5,7 @@ from ..authenticator import Authenticator from ..conductor import TerminalConductor from ..config import Config from ..crawler import Crawler +from .ilias import IliasCrawler, IliasCrawlerSection from .local import LocalCrawler, LocalCrawlerSection CrawlerConstructor = Callable[[ @@ -18,4 +19,6 @@ CrawlerConstructor = Callable[[ CRAWLERS: Dict[str, CrawlerConstructor] = { "local": lambda n, s, c, t, a: LocalCrawler(n, LocalCrawlerSection(s), c, t), + "ilias": lambda n, s, c, t, a: + IliasCrawler(n, IliasCrawlerSection(s), c, t, a), } diff --git a/PFERD/crawlers/ilias.py b/PFERD/crawlers/ilias.py new file mode 100644 index 0000000..84a7c15 --- /dev/null +++ b/PFERD/crawlers/ilias.py @@ -0,0 +1,209 @@ +from configparser import SectionProxy +from pathlib import PurePath +from typing import Any, Dict, Optional + +import aiohttp +from bs4 import BeautifulSoup +from PFERD.utils import soupify + +from ..authenticators import Authenticator +from ..conductor import TerminalConductor +from ..config import Config +from ..crawler import (Crawler, CrawlerSection, HttpCrawler, anoncritical, + arepeat) + + +class IliasCrawlerSection(CrawlerSection): + + def __init__(self, section: SectionProxy): + super().__init__(section) + + if not self.course_id() and not self.element_url(): + self.missing_value("course_id or element_url") + + def course_id(self) -> Optional[str]: + return self.s.get("course_id") + + def element_url(self) -> Optional[str]: + return self.s.get("element_url") + + def base_url(self) -> str: + return self.s.get("ilias_url", "https://ilias.studium.kit.edu/") + + def tfa_auth(self, authenticators: Dict[str, Authenticator]) -> Optional[Authenticator]: + value = self.s.get("tfa_auth") + if not value: + return None + + auth = authenticators.get(f"auth:{value}") + if auth is None: + self.invalid_value("auth", value, "No such auth section exists") + return auth + + +class IliasCrawler(HttpCrawler): + def __init__( + self, + name: str, + section: IliasCrawlerSection, + config: Config, + conductor: TerminalConductor, + authenticators: Dict[str, Authenticator] + ): + super().__init__(name, section, config, conductor) + + self._shibboleth_login = KitShibbolethLogin( + section.auth(authenticators), + section.tfa_auth(authenticators) + ) + self._base_url = section.base_url() + + self._course_id = section.course_id() + self._element_url = section.element_url() + + async def crawl(self) -> None: + async with self.crawl_bar(PurePath("/")) as bar: + soup = await self._get_page(self._base_url) + self.print("[green]Gotcha![/]") + + async def _get_page(self, url: str, retries_left: int = 3) -> BeautifulSoup: + if retries_left < 0: + # TODO: Proper exception + raise RuntimeError("Get page failed too often") + async with self.session.get(url) as request: + soup = soupify(await request.read()) + if self._is_logged_in(soup): + return soup + + await self._shibboleth_login.login(self.session) + + return await self._get_page(url, retries_left - 1) + + @staticmethod + def _is_logged_in(soup: BeautifulSoup) -> bool: + # Normal ILIAS pages + userlog = soup.find("li", {"id": "userlog"}) + if userlog is not None: + return True + # Video listing embeds do not have complete ILIAS html. Try to match them by + # their video listing table + video_table = soup.find( + recursive=True, + name="table", + attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")} + ) + if video_table is not None: + return True + # The individual video player wrapper page has nothing of the above. + # Match it by its playerContainer. + if soup.select_one("#playerContainer") is not None: + return True + return False + + +class KitShibbolethLogin: + """ + Login via KIT's shibboleth system. + """ + + def __init__(self, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]) -> None: + self._auth = authenticator + self._tfa_auth = tfa_authenticator + + async def login(self, sess: aiohttp.ClientSession) -> None: + """ + Performs the ILIAS Shibboleth authentication dance and saves the login + cookies it receieves. + + This function should only be called whenever it is detected that you're + not logged in. The cookies obtained should be good for a few minutes, + maybe even an hour or two. + """ + + # Equivalent: Click on "Mit KIT-Account anmelden" button in + # https://ilias.studium.kit.edu/login.php + url = "https://ilias.studium.kit.edu/Shibboleth.sso/Login" + data = { + "sendLogin": "1", + "idp_selection": "https://idp.scc.kit.edu/idp/shibboleth", + "target": "/shib_login.php", + "home_organization_selection": "Mit KIT-Account anmelden", + } + soup: BeautifulSoup = await _post(sess, url, data) + + # Attempt to login using credentials, if necessary + while not self._login_successful(soup): + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = soup.find("form", {"class": "full content", "method": "post"}) + action = form["action"] + + csrf_token = form.find("input", {"name": "csrf_token"})["value"] + + # Equivalent: Enter credentials in + # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO + url = "https://idp.scc.kit.edu" + action + username, password = await self._auth.credentials() + data = { + "_eventId_proceed": "", + "j_username": username, + "j_password": password, + "csrf_token": csrf_token + } + soup = await _post(sess, url, data) + + if self._tfa_required(soup): + soup = await self._authenticate_tfa(sess, soup) + + if not self._login_successful(soup): + self._auth.invalid_credentials() + + # Equivalent: Being redirected via JS automatically + # (or clicking "Continue" if you have JS disabled) + relay_state = soup.find("input", {"name": "RelayState"}) + saml_response = soup.find("input", {"name": "SAMLResponse"}) + url = "https://ilias.studium.kit.edu/Shibboleth.sso/SAML2/POST" + data = { # using the info obtained in the while loop above + "RelayState": relay_state["value"], + "SAMLResponse": saml_response["value"], + } + await sess.post(url, data=data) + + async def _authenticate_tfa( + self, + session: aiohttp.ClientSession, + soup: BeautifulSoup + ) -> BeautifulSoup: + if not self._tfa_auth: + raise RuntimeError("No 'tfa_auth' present but you use two-factor authentication!") + + _, tfa_token = await self._tfa_auth.credentials() + + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = soup.find("form", {"method": "post"}) + action = form["action"] + + # Equivalent: Enter token in + # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO + url = "https://idp.scc.kit.edu" + action + data = { + "_eventId_proceed": "", + "j_tokenNumber": tfa_token + } + return _post(session, url, data) + + @staticmethod + def _login_successful(soup: BeautifulSoup) -> bool: + relay_state = soup.find("input", {"name": "RelayState"}) + saml_response = soup.find("input", {"name": "SAMLResponse"}) + return relay_state is not None and saml_response is not None + + @staticmethod + def _tfa_required(soup: BeautifulSoup) -> bool: + return soup.find(id="j_tokenNumber") is not None + + +async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup: + async with session.post(url, data=data) as response: + return soupify(await response.read()) diff --git a/PFERD/utils.py b/PFERD/utils.py index 08017aa..d7c61ec 100644 --- a/PFERD/utils.py +++ b/PFERD/utils.py @@ -4,6 +4,8 @@ import functools import getpass from typing import Any, Callable, Optional, TypeVar +import bs4 + T = TypeVar("T") @@ -23,6 +25,12 @@ async def ainput(prompt: str) -> str: async def agetpass(prompt: str) -> str: return await to_thread(lambda: getpass.getpass(prompt)) +def soupify(data: bytes) -> bs4.BeautifulSoup: + """ + Parses HTML to a beautifulsoup object. + """ + + return bs4.BeautifulSoup(data, "html.parser") async def prompt_yes_no(query: str, default: Optional[bool]) -> bool: """ diff --git a/setup.cfg b/setup.cfg index f2806e2..18ff558 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,6 +9,7 @@ install_requires = aiohttp>=3.7.4.post0 beautifulsoup4>=4.9.3 rich>=10.1.0 + beautifulsoup4>=4.9.3 [options.entry_points] console_scripts =