diff --git a/PFERD/crawl/__init__.py b/PFERD/crawl/__init__.py index 9a0e080..bb3f911 100644 --- a/PFERD/crawl/__init__.py +++ b/PFERD/crawl/__init__.py @@ -7,6 +7,7 @@ from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401 from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection +from .language_translator_crawler import LanguageTranslatorCrawler, LanguageTranslatorCrawlerSection CrawlerConstructor = Callable[[ str, # Name (without the "crawl:" prefix) @@ -24,4 +25,6 @@ CRAWLERS: Dict[str, CrawlerConstructor] = { KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), "kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c), + "language-translator": lambda n, s, c, a: + LanguageTranslatorCrawler(n, LanguageTranslatorCrawlerSection(s), c, a), } diff --git a/PFERD/crawl/language_translator_crawler.py b/PFERD/crawl/language_translator_crawler.py new file mode 100644 index 0000000..a817416 --- /dev/null +++ b/PFERD/crawl/language_translator_crawler.py @@ -0,0 +1,136 @@ +from pathlib import PurePath +from typing import Awaitable, Dict, List, Optional, Tuple +from datetime import datetime + +from bs4 import BeautifulSoup + +from ..auth import Authenticator +from ..config import Config +from .crawler import CrawlError, FileSink, ProgressBar +from ..utils import soupify +from .http_crawler import HttpCrawler, HttpCrawlerSection +from .shib_login import ShibbolethLogin + +BASE_URL = "https://lt2srv.iar.kit.edu" + +class LanguageTranslatorCrawlerSection(HttpCrawlerSection): + def tfa_auth( + self, authenticators: Dict[str, Authenticator] + ) -> Optional[Authenticator]: + value: Optional[str] = self.s.get("tfa_auth") + if value is None: + return None + auth = authenticators.get(value) + if auth is None: + self.invalid_value("tfa_auth", value, "No such auth section exists") + return auth + + def target(self) -> str: + target = self.s.get("target") + if not target: + self.missing_value("target") + return target + +class LanguageTranslatorCrawler(HttpCrawler): + def __init__( + self, + name: str, + section: LanguageTranslatorCrawlerSection, + config: Config, + authenticators: Dict[str, Authenticator] + ): + # Setting a main authenticator for cookie sharing + auth = section.auth(authenticators) + super().__init__(name, section, config, shared_auth=auth) + self._auth = auth + self._url = section.target() + self._tfa_auth = section.tfa_auth(authenticators) + self._shibboleth_login = ShibbolethLogin(self._url, self._auth, self._tfa_auth) + + async def _run(self) -> None: + auth_id = await self._current_auth_id() + await self.authenticate(auth_id) + + maybe_cl = await self.crawl(PurePath(".")) + if not maybe_cl: + return + + tasks: List[Awaitable[None]] = [] + + async with maybe_cl: + page, url = await self.get_page() + links = [] + file_names = [] + for archive_div in page.find_all('div', class_='archivesession'): + header_div = archive_div.find('div', class_='window-header') + title = header_div.get_text(strip=True) if header_div else "Untitled" + + a_tag = archive_div.find('a', href=True) + if a_tag and '/archivesession' in a_tag['href']: + media_url = BASE_URL + a_tag['href'].replace('archivesession', 'archivemedia') + links.append(media_url) + + # Make HEAD request to get content type + async with self.session.get(media_url, allow_redirects=False) as resp: + content_type = resp.headers.get('Content-Type', '') + extension = '' + if 'video/mp4' in content_type: + extension = '.mp4' + elif 'audio/mp3' in content_type: + extension = '.mp3' + elif 'video/webm' in content_type: + extension = '.webm' + file_names.append(f"{title}{extension}") + + for title, link in zip(file_names, links): + etag, mtime = None, None # await self._request_resource_version(link) + tasks.append(self._download_file(PurePath("."), title, link, etag, mtime)) + + await self.gather(tasks) + + async def _authenticate(self) -> None: + await self._shibboleth_login.login(self.session) + + async def _download_file( + self, + parent: PurePath, + title: str, + url: str, + etag: Optional[str], + mtime: Optional[datetime] + ) -> None: + element_path = parent / title + + prev_etag = self._get_previous_etag_from_report(element_path) + etag_differs = None if prev_etag is None else prev_etag != etag + + maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime) + if not maybe_dl: + # keep storing the known file's etag + if prev_etag: + self._add_etag_to_report(element_path, prev_etag) + return + + async with maybe_dl as (bar, sink): + await self._stream_from_url(url, element_path, sink, bar) + + async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None: + async with self.session.get(url, allow_redirects=False) as resp: + if resp.status == 403: + raise CrawlError("Received a 403. Are you within the KIT network/VPN?") + if resp.content_length: + bar.set_total(resp.content_length) + + async for data in resp.content.iter_chunked(1024): + sink.file.write(data) + bar.advance(len(data)) + + sink.done() + + self._add_etag_to_report(path, resp.headers.get("ETag")) + + + async def get_page(self) -> Tuple[BeautifulSoup, str]: + async with self.session.get(self._url) as request: + content = (await request.read()).decode("utf-8") + return soupify(content.encode("utf-8")), str(request.url) \ No newline at end of file diff --git a/PFERD/crawl/shib_login.py b/PFERD/crawl/shib_login.py new file mode 100644 index 0000000..41cf96f --- /dev/null +++ b/PFERD/crawl/shib_login.py @@ -0,0 +1,129 @@ +from typing import Any, Optional, cast + +import aiohttp +import yarl +from bs4 import BeautifulSoup, Tag + +from ..auth import Authenticator, TfaAuthenticator +from ..logging import log +from ..utils import soupify +from .crawler import CrawlError + + +class ShibbolethLogin: + """ + Login via shibboleth system. + """ + + def __init__( + self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator] + ) -> None: + self._ilias_url = ilias_url + self._auth = authenticator + self._tfa_auth = tfa_authenticator + + async def login(self, sess: aiohttp.ClientSession) -> None: + """ + Performs the Language Translator Shibboleth authentication dance and saves the login + cookies it receieves. + + This function should only be called whenever it is detected that you're + not logged in. The cookies obtained should be good for a few minutes, + maybe even an hour or two. + """ + + # Get Shibboleth login URL from initial request + async with sess.get("https://lt2srv.iar.kit.edu/login") as response: + url = str(response.url).replace("/auth?", "/auth/shib?") + async with sess.get(url) as response: + shib_url = response.url + if str(shib_url).startswith("https://lt2srv.iar.kit.edu"): + log.explain( + "Language Translator recognized our shib token and logged us in in the background, returning" + ) + return + soup: BeautifulSoup = soupify(await response.read()) + + # Attempt to login using credentials, if necessary + while not self._login_successful(soup): + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = cast(Tag, soup.find("form", {"method": "post"})) + action = cast(str, form["action"]) + + # Equivalent: Enter credentials in + # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO + url = str(shib_url.origin()) + action + username, password = await self._auth.credentials() + data = { + "_eventId_proceed": "", + "j_username": username, + "j_password": password, + "fudis_web_authn_assertion_input": "", + } + if csrf_token_input := form.find("input", {"name": "csrf_token"}): + data["csrf_token"] = csrf_token_input["value"] # type: ignore + soup = await _post(sess, url, data) + + if soup.find(id="attributeRelease"): + raise CrawlError( + "ILIAS Shibboleth entitlements changed! " + "Please log in once in your browser and review them" + ) + + if self._tfa_required(soup): + soup = await self._authenticate_tfa(sess, soup, shib_url) + + if not self._login_successful(soup): + self._auth.invalidate_credentials() + + # Equivalent: Being redirected via JS automatically + # (or clicking "Continue" if you have JS disabled) + relay_state = cast(Tag, soup.find("input", {"name": "RelayState"})) + saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"})) + url = form = soup.find("form", {"method": "post"})["action"] # type: ignore + data = { # using the info obtained in the while loop above + "RelayState": cast(str, relay_state["value"]), + "SAMLResponse": cast(str, saml_response["value"]), + } + await sess.post(cast(str, url), data=data) + + async def _authenticate_tfa( + self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL + ) -> BeautifulSoup: + if not self._tfa_auth: + self._tfa_auth = TfaAuthenticator("ilias-anon-tfa") + + tfa_token = await self._tfa_auth.password() + + # Searching the form here so that this fails before asking for + # credentials rather than after asking. + form = cast(Tag, soup.find("form", {"method": "post"})) + action = cast(str, form["action"]) + + # Equivalent: Enter token in + # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO + url = str(shib_url.origin()) + action + username, password = await self._auth.credentials() + data = { + "_eventId_proceed": "", + "fudis_otp_input": tfa_token, + } + if csrf_token_input := form.find("input", {"name": "csrf_token"}): + data["csrf_token"] = csrf_token_input["value"] # type: ignore + return await _post(session, url, data) + + @staticmethod + def _login_successful(soup: BeautifulSoup) -> bool: + relay_state = soup.find("input", {"name": "RelayState"}) + saml_response = soup.find("input", {"name": "SAMLResponse"}) + return relay_state is not None and saml_response is not None + + @staticmethod + def _tfa_required(soup: BeautifulSoup) -> bool: + return soup.find(id="fudiscr-form") is not None + + +async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup: + async with session.post(url, data=data) as response: + return soupify(await response.read())