mirror of
https://github.com/Garmelon/PFERD.git
synced 2026-04-12 07:25:04 +02:00
Merge 555c8ac341 into e246053de2
This commit is contained in:
commit
cb0a05333a
3 changed files with 267 additions and 0 deletions
|
|
@ -7,6 +7,7 @@ from .crawler import Crawler, CrawlError, CrawlerSection # noqa: F401
|
|||
from .ilias import IliasWebCrawler, IliasWebCrawlerSection, KitIliasWebCrawler, KitIliasWebCrawlerSection
|
||||
from .kit_ipd_crawler import KitIpdCrawler, KitIpdCrawlerSection
|
||||
from .local_crawler import LocalCrawler, LocalCrawlerSection
|
||||
from .language_translator_crawler import LanguageTranslatorCrawler, LanguageTranslatorCrawlerSection
|
||||
|
||||
CrawlerConstructor = Callable[
|
||||
[
|
||||
|
|
@ -23,4 +24,5 @@ CRAWLERS: dict[str, CrawlerConstructor] = {
|
|||
"ilias-web": lambda n, s, c, a: IliasWebCrawler(n, IliasWebCrawlerSection(s), c, a),
|
||||
"kit-ilias-web": lambda n, s, c, a: KitIliasWebCrawler(n, KitIliasWebCrawlerSection(s), c, a),
|
||||
"kit-ipd": lambda n, s, c, a: KitIpdCrawler(n, KitIpdCrawlerSection(s), c, a),
|
||||
"language-translator": lambda n, s, c, a: LanguageTranslatorCrawler(n, LanguageTranslatorCrawlerSection(s), c, a),
|
||||
}
|
||||
|
|
|
|||
136
PFERD/crawl/language_translator_crawler.py
Normal file
136
PFERD/crawl/language_translator_crawler.py
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
from pathlib import PurePath
|
||||
from typing import Awaitable, Dict, List, Optional, Tuple
|
||||
from datetime import datetime
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from ..auth import Authenticator
|
||||
from ..config import Config
|
||||
from .crawler import CrawlError, FileSink, ProgressBar
|
||||
from ..utils import soupify
|
||||
from .http_crawler import HttpCrawler, HttpCrawlerSection
|
||||
from .shib_login import ShibbolethLogin
|
||||
|
||||
BASE_URL = "https://lt2srv.iar.kit.edu"
|
||||
|
||||
class LanguageTranslatorCrawlerSection(HttpCrawlerSection):
|
||||
def tfa_auth(
|
||||
self, authenticators: Dict[str, Authenticator]
|
||||
) -> Optional[Authenticator]:
|
||||
value: Optional[str] = self.s.get("tfa_auth")
|
||||
if value is None:
|
||||
return None
|
||||
auth = authenticators.get(value)
|
||||
if auth is None:
|
||||
self.invalid_value("tfa_auth", value, "No such auth section exists")
|
||||
return auth
|
||||
|
||||
def target(self) -> str:
|
||||
target = self.s.get("target")
|
||||
if not target:
|
||||
self.missing_value("target")
|
||||
return target
|
||||
|
||||
class LanguageTranslatorCrawler(HttpCrawler):
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
section: LanguageTranslatorCrawlerSection,
|
||||
config: Config,
|
||||
authenticators: Dict[str, Authenticator]
|
||||
):
|
||||
# Setting a main authenticator for cookie sharing
|
||||
auth = section.auth(authenticators)
|
||||
super().__init__(name, section, config, shared_auth=auth)
|
||||
self._auth = auth
|
||||
self._url = section.target()
|
||||
self._tfa_auth = section.tfa_auth(authenticators)
|
||||
self._shibboleth_login = ShibbolethLogin(self._url, self._auth, self._tfa_auth)
|
||||
|
||||
async def _run(self) -> None:
|
||||
auth_id = await self._current_auth_id()
|
||||
await self.authenticate(auth_id)
|
||||
|
||||
maybe_cl = await self.crawl(PurePath("."))
|
||||
if not maybe_cl:
|
||||
return
|
||||
|
||||
tasks: List[Awaitable[None]] = []
|
||||
|
||||
async with maybe_cl:
|
||||
page, url = await self.get_page()
|
||||
links = []
|
||||
file_names = []
|
||||
for archive_div in page.find_all('div', class_='archivesession'):
|
||||
header_div = archive_div.find('div', class_='window-header')
|
||||
title = header_div.get_text(strip=True) if header_div else "Untitled"
|
||||
|
||||
a_tag = archive_div.find('a', href=True)
|
||||
if a_tag and '/archivesession' in a_tag['href']:
|
||||
media_url = BASE_URL + a_tag['href'].replace('archivesession', 'archivemedia')
|
||||
links.append(media_url)
|
||||
|
||||
# Make HEAD request to get content type
|
||||
async with self.session.get(media_url, allow_redirects=False) as resp:
|
||||
content_type = resp.headers.get('Content-Type', '')
|
||||
extension = ''
|
||||
if 'video/mp4' in content_type:
|
||||
extension = '.mp4'
|
||||
elif 'audio/mp3' in content_type:
|
||||
extension = '.mp3'
|
||||
elif 'video/webm' in content_type:
|
||||
extension = '.webm'
|
||||
file_names.append(f"{title}{extension}")
|
||||
|
||||
for title, link in zip(file_names, links):
|
||||
etag, mtime = None, None # await self._request_resource_version(link)
|
||||
tasks.append(self._download_file(PurePath("."), title, link, etag, mtime))
|
||||
|
||||
await self.gather(tasks)
|
||||
|
||||
async def _authenticate(self) -> None:
|
||||
await self._shibboleth_login.login(self.session)
|
||||
|
||||
async def _download_file(
|
||||
self,
|
||||
parent: PurePath,
|
||||
title: str,
|
||||
url: str,
|
||||
etag: Optional[str],
|
||||
mtime: Optional[datetime]
|
||||
) -> None:
|
||||
element_path = parent / title
|
||||
|
||||
prev_etag = self._get_previous_etag_from_report(element_path)
|
||||
etag_differs = None if prev_etag is None else prev_etag != etag
|
||||
|
||||
maybe_dl = await self.download(element_path, etag_differs=etag_differs, mtime=mtime)
|
||||
if not maybe_dl:
|
||||
# keep storing the known file's etag
|
||||
if prev_etag:
|
||||
self._add_etag_to_report(element_path, prev_etag)
|
||||
return
|
||||
|
||||
async with maybe_dl as (bar, sink):
|
||||
await self._stream_from_url(url, element_path, sink, bar)
|
||||
|
||||
async def _stream_from_url(self, url: str, path: PurePath, sink: FileSink, bar: ProgressBar) -> None:
|
||||
async with self.session.get(url, allow_redirects=False) as resp:
|
||||
if resp.status == 403:
|
||||
raise CrawlError("Received a 403. Are you within the KIT network/VPN?")
|
||||
if resp.content_length:
|
||||
bar.set_total(resp.content_length)
|
||||
|
||||
async for data in resp.content.iter_chunked(1024):
|
||||
sink.file.write(data)
|
||||
bar.advance(len(data))
|
||||
|
||||
sink.done()
|
||||
|
||||
self._add_etag_to_report(path, resp.headers.get("ETag"))
|
||||
|
||||
|
||||
async def get_page(self) -> Tuple[BeautifulSoup, str]:
|
||||
async with self.session.get(self._url) as request:
|
||||
content = (await request.read()).decode("utf-8")
|
||||
return soupify(content.encode("utf-8")), str(request.url)
|
||||
129
PFERD/crawl/shib_login.py
Normal file
129
PFERD/crawl/shib_login.py
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
from typing import Any, Optional, cast
|
||||
|
||||
import aiohttp
|
||||
import yarl
|
||||
from bs4 import BeautifulSoup, Tag
|
||||
|
||||
from ..auth import Authenticator, TfaAuthenticator
|
||||
from ..logging import log
|
||||
from ..utils import soupify
|
||||
from .crawler import CrawlError
|
||||
|
||||
|
||||
class ShibbolethLogin:
|
||||
"""
|
||||
Login via shibboleth system.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, ilias_url: str, authenticator: Authenticator, tfa_authenticator: Optional[Authenticator]
|
||||
) -> None:
|
||||
self._ilias_url = ilias_url
|
||||
self._auth = authenticator
|
||||
self._tfa_auth = tfa_authenticator
|
||||
|
||||
async def login(self, sess: aiohttp.ClientSession) -> None:
|
||||
"""
|
||||
Performs the Language Translator Shibboleth authentication dance and saves the login
|
||||
cookies it receieves.
|
||||
|
||||
This function should only be called whenever it is detected that you're
|
||||
not logged in. The cookies obtained should be good for a few minutes,
|
||||
maybe even an hour or two.
|
||||
"""
|
||||
|
||||
# Get Shibboleth login URL from initial request
|
||||
async with sess.get("https://lt2srv.iar.kit.edu/login") as response:
|
||||
url = str(response.url).replace("/auth?", "/auth/shib?")
|
||||
async with sess.get(url) as response:
|
||||
shib_url = response.url
|
||||
if str(shib_url).startswith("https://lt2srv.iar.kit.edu"):
|
||||
log.explain(
|
||||
"Language Translator recognized our shib token and logged us in in the background, returning"
|
||||
)
|
||||
return
|
||||
soup: BeautifulSoup = soupify(await response.read())
|
||||
|
||||
# Attempt to login using credentials, if necessary
|
||||
while not self._login_successful(soup):
|
||||
# Searching the form here so that this fails before asking for
|
||||
# credentials rather than after asking.
|
||||
form = cast(Tag, soup.find("form", {"method": "post"}))
|
||||
action = cast(str, form["action"])
|
||||
|
||||
# Equivalent: Enter credentials in
|
||||
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
|
||||
url = str(shib_url.origin()) + action
|
||||
username, password = await self._auth.credentials()
|
||||
data = {
|
||||
"_eventId_proceed": "",
|
||||
"j_username": username,
|
||||
"j_password": password,
|
||||
"fudis_web_authn_assertion_input": "",
|
||||
}
|
||||
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
|
||||
data["csrf_token"] = csrf_token_input["value"] # type: ignore
|
||||
soup = await _post(sess, url, data)
|
||||
|
||||
if soup.find(id="attributeRelease"):
|
||||
raise CrawlError(
|
||||
"ILIAS Shibboleth entitlements changed! "
|
||||
"Please log in once in your browser and review them"
|
||||
)
|
||||
|
||||
if self._tfa_required(soup):
|
||||
soup = await self._authenticate_tfa(sess, soup, shib_url)
|
||||
|
||||
if not self._login_successful(soup):
|
||||
self._auth.invalidate_credentials()
|
||||
|
||||
# Equivalent: Being redirected via JS automatically
|
||||
# (or clicking "Continue" if you have JS disabled)
|
||||
relay_state = cast(Tag, soup.find("input", {"name": "RelayState"}))
|
||||
saml_response = cast(Tag, soup.find("input", {"name": "SAMLResponse"}))
|
||||
url = form = soup.find("form", {"method": "post"})["action"] # type: ignore
|
||||
data = { # using the info obtained in the while loop above
|
||||
"RelayState": cast(str, relay_state["value"]),
|
||||
"SAMLResponse": cast(str, saml_response["value"]),
|
||||
}
|
||||
await sess.post(cast(str, url), data=data)
|
||||
|
||||
async def _authenticate_tfa(
|
||||
self, session: aiohttp.ClientSession, soup: BeautifulSoup, shib_url: yarl.URL
|
||||
) -> BeautifulSoup:
|
||||
if not self._tfa_auth:
|
||||
self._tfa_auth = TfaAuthenticator("ilias-anon-tfa")
|
||||
|
||||
tfa_token = await self._tfa_auth.password()
|
||||
|
||||
# Searching the form here so that this fails before asking for
|
||||
# credentials rather than after asking.
|
||||
form = cast(Tag, soup.find("form", {"method": "post"}))
|
||||
action = cast(str, form["action"])
|
||||
|
||||
# Equivalent: Enter token in
|
||||
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
|
||||
url = str(shib_url.origin()) + action
|
||||
username, password = await self._auth.credentials()
|
||||
data = {
|
||||
"_eventId_proceed": "",
|
||||
"fudis_otp_input": tfa_token,
|
||||
}
|
||||
if csrf_token_input := form.find("input", {"name": "csrf_token"}):
|
||||
data["csrf_token"] = csrf_token_input["value"] # type: ignore
|
||||
return await _post(session, url, data)
|
||||
|
||||
@staticmethod
|
||||
def _login_successful(soup: BeautifulSoup) -> bool:
|
||||
relay_state = soup.find("input", {"name": "RelayState"})
|
||||
saml_response = soup.find("input", {"name": "SAMLResponse"})
|
||||
return relay_state is not None and saml_response is not None
|
||||
|
||||
@staticmethod
|
||||
def _tfa_required(soup: BeautifulSoup) -> bool:
|
||||
return soup.find(id="fudiscr-form") is not None
|
||||
|
||||
|
||||
async def _post(session: aiohttp.ClientSession, url: str, data: Any) -> BeautifulSoup:
|
||||
async with session.post(url, data=data) as response:
|
||||
return soupify(await response.read())
|
||||
Loading…
Add table
Add a link
Reference in a new issue