Add Language Translator Cralwer

This commit is contained in:
Florian Raith 2025-06-15 01:39:21 +02:00
parent 56e3065950
commit ef1cc0eca1
3 changed files with 268 additions and 0 deletions

View file

@ -7,6 +7,7 @@ from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
from .local_crawler import LocalCrawler, LocalCrawlerSection from .local_crawler import LocalCrawler, LocalCrawlerSection
from .language_translator_crawler import LanguageTranslatorCrawler, LanguageTranslatorCrawlerSection
CrawlerConstructor = Callable[[ CrawlerConstructor = Callable[[
str, # Name (without the "crawl:" prefix) str, # Name (without the "crawl:" prefix)
@ -24,4 +25,6 @@ CRAWLERS: Dict[str, CrawlerConstructor] = {
KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a), KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
"kit-ipd": lambda n, s, c, a: "kit-ipd": lambda n, s, c, a:
KitIpdCrawler(n, KitIpdCrawlerSection(s), c), KitIpdCrawler(n, KitIpdCrawlerSection(s), c),
"language-translator": lambda n, s, c, a:
LanguageTranslatorCrawler(n, LanguageTranslatorCrawlerSection(s), c, a),
} }

View file

@ -0,0 +1,136 @@
from pathlib import PurePath
from typing import Awaitable, Dict, List, Optional, Tuple
from datetime import datetime
from bs4 import BeautifulSoup
from ..auth import Authenticator
from ..config import Config
from .crawler import CrawlError, FileSink, ProgressBar
from ..utils import soupify
from .http_crawler import HttpCrawler, HttpCrawlerSection
from .shib_login import ShibbolethLogin
BASE_URL = "https://lt2srv.iar.kit.edu"
class LanguageTranslatorCrawlerSection(HttpCrawlerSection):
def tfa_auth(
self, authenticators: Dict[str, Authenticator]
) -> Optional[Authenticator]:
value: Optional[str] = self.s.get("tfa_auth")
if value is None:
return None
auth = authenticators.get(value)
if auth is None:
self.invalid_value("tfa_auth", value, "No such auth section exists")
return auth
def target(self) -> str:
target = self.s.get("target")
if not target:
self.missing_value("target")
return target
class LanguageTranslatorCrawler(HttpCrawler):
def __init__(
self,
name: str,
section: LanguageTranslatorCrawlerSection,
config: Config,
authenticators: Dict[str, Authenticator]
):
# Setting a main authenticator for cookie sharing
auth = section.auth(authenticators)
super().__init__(name, section, config, shared_auth=auth)
self._auth = auth
self._url = section.target()
self._tfa_auth = section.tfa_auth(authenticators)
self._shibboleth_login = ShibbolethLogin(self._url, self._auth, self._tfa_auth)
async def _run(self) -> None:
auth_id = await self._current_auth_id()
await self.authenticate(auth_id)
maybe_cl = await self.crawl(PurePath("."))
if not maybe_cl:
return
tasks: List[Awaitable[None]] = []
async with maybe_cl:
page, url = await self.get_page()
links = []
file_names = []
for archive_div in page.find_all('div', class_='archivesession'):
header_div = archive_div.find('div', class_='window-header')
title = header_div.get_text(strip=True) if header_div else "Untitled"
a_tag = archive_div.find('a', href=True)
if a_tag and '/archivesession' in a_tag['href']:
media_url = BASE_URL + a_tag['href'].replace('archivesession', 'archivemedia')
links.append(media_url)
# Make HEAD request to get content type
async with self.session.get(media_url, allow_redirects=False) as resp:
content_type = resp.headers.get('Content-Type', '')
extension = ''
if 'video/mp4' in content_type:
extension = '.mp4'
elif 'audio/mp3' in content_type:
extension = '.mp3'
elif 'video/webm' in content_type:
extension = '.webm'
file_names.append(f"{title}{extension}")
for title, link in zip(file_names, links):
etag, mtime = None, None # await self._request_resource_version(link)
tasks.append(self._download_file(PurePath("."), title, link, etag, mtime))
await self.gather(tasks)
async def _authenticate(self) -> None:
await self._shibboleth_login.login(self.session)
async def _download_file(
self,
parent: PurePath,
title: str,
url: str,
etag: Optional[str],
mtime: Optional[datetime]
) -> None:
element_path = parent / title
prev_etag = self._get_previous_etag_from_report(element_path)
etag_differs = None if prev_etag is None else prev_etag != etag
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
if not maybe_dl:
# keep storing the known file's etag
if prev_etag:
self._add_etag_to_report(element_path, prev_etag)
return
async with maybe_dl as (bar, sink):
await self._stream_from_url(url, element_path, sink, bar)
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status == 403:
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
if resp.content_length:
bar.set_total(resp.content_length)
async for data in resp.content.iter_chunked(1024):
sink.file.write(data)
bar.advance(len(data))
sink.done()
self._add_etag_to_report(path, resp.headers.get("ETag"))
async def get_page(self) -> Tuple[BeautifulSoup, str]:
async with self.session.get(self._url) as request:
content = (await request.read()).decode("utf-8")
return soupify(content.encode("utf-8")), str(request.url)

129
PFERD/crawl/shib_login.py Normal file
View file

@ -0,0 +1,129 @@
from typing import Any, Optional, cast
import aiohttp
import yarl
from bs4 import BeautifulSoup, Tag
from ..auth import Authenticator, TfaAuthenticator
from ..logging import log
from ..utils import soupify
from .crawler import CrawlError
class ShibbolethLogin:
"""
Login via shibboleth system.
"""
def __init__(
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
) -> None:
self._ilias_url = ilias_url
self._auth = authenticator
self._tfa_auth = tfa_authenticator
async def login(self, sess: aiohttp.ClientSession) -> None:
"""
Performs the Language Translator Shibboleth authentication dance and saves the login
cookies it receieves.
This function should only be called whenever it is detected that you're
not logged in. The cookies obtained should be good for a few minutes,
maybe even an hour or two.
"""
# Get Shibboleth login URL from initial request
async with sess.get("https://lt2srv.iar.kit.edu/login") as response:
url = str(response.url).replace("/auth?", "/auth/shib?")
async with sess.get(url) as response:
shib_url = response.url
if str(shib_url).startswith("https://lt2srv.iar.kit.edu"):
log.explain(
"Language Translator recognized our shib token and logged us in in the background, returning"
)
return
soup: BeautifulSoup = soupify(await response.read())
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter credentials in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"j_username": username,
"j_password": password,
"fudis_web_authn_assertion_input": "",
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
soup = await _post(sess, url, data)
if soup.find(id="attributeRelease"):
raise CrawlError(
"ILIAS Shibboleth entitlements changed! "
"Please log in once in your browser and review them"
)
if self._tfa_required(soup):
soup = await self._authenticate_tfa(sess, soup, shib_url)
if not self._login_successful(soup):
self._auth.invalidate_credentials()
# Equivalent: Being redirected via JS automatically
# (or clicking "Continue" if you have JS disabled)
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
data = { # using the info obtained in the while loop above
"RelayState": cast(str, relay_state["value"]),
"SAMLResponse": cast(str, saml_response["value"]),
}
await sess.post(cast(str, url), data=data)
async def _authenticate_tfa(
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
) -> BeautifulSoup:
if not self._tfa_auth:
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
tfa_token = await self._tfa_auth.password()
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = cast(Tag, soup.find("form", {"method": "post"}))
action = cast(str, form["action"])
# Equivalent: Enter token in
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
url = str(shib_url.origin()) + action
username, password = await self._auth.credentials()
data = {
"_eventId_proceed": "",
"fudis_otp_input": tfa_token,
}
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
data["csrf_token"] = csrf_token_input["value"] # type: ignore
return await _post(session, url, data)
@staticmethod
def _login_successful(soup: BeautifulSoup) -> bool:
relay_state = soup.find("input", {"name": "RelayState"})
saml_response = soup.find("input", {"name": "SAMLResponse"})
return relay_state is not None and saml_response is not None
@staticmethod
def _tfa_required(soup: BeautifulSoup) -> bool:
return soup.find(id="fudiscr-form") is not None
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
async with session.post(url, data=data) as response:
return soupify(await response.read())