use the async client to crawl illias

This commit is contained in:
be7a 2021-04-23 21:43:52 +02:00
parent 44aeb6c2eb
commit 411d4b91d5
No known key found for this signature in database
GPG key ID: 6510870A77F49A99
5 changed files with 131 additions and 84 deletions

View file

@ -65,3 +65,10 @@ class CookieJar:
client.cookies = self.cookies # type: ignore
return client
def create_async_client(self) -> httpx.AsyncClient:
"""Create a new async client using the cookie jar."""
# TODO: timeout=None was the default behaviour of requests. An approprite value should probably be set
client = httpx.AsyncClient(timeout=None)
client.cookies = self.cookies
return client

View file

@ -24,9 +24,9 @@ class IliasAuthenticator(abc.ABC):
"""
@abc.abstractmethod
def authenticate(self, client: httpx.Client) -> None:
async def authenticate(self, client: httpx.AsyncClient) -> None:
"""
Log a httpx client into this authenticator's ILIAS account.
Log a httpx AsyncClient into this authenticator's ILIAS account.
"""
@ -45,7 +45,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth")
def authenticate(self, sess: httpx.Client) -> None:
async def authenticate(self, client: httpx.AsyncClient) -> None:
"""
Performs the ILIAS Shibboleth authentication dance and saves the login
cookies it receieves.
@ -65,7 +65,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
"target": "/shib_login.php",
"home_organization_selection": "Mit KIT-Account anmelden",
}
soup = soupify(sess.post(url, data=data))
soup = soupify(await client.post(url, data=data))
# Attempt to login using credentials, if necessary
while not self._login_successful(soup):
@ -86,10 +86,10 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
"j_password": self._auth.password,
"csrf_token": csrf_token
}
soup = soupify(sess.post(url, data=data))
soup = soupify(await client.post(url, data=data))
if self._tfa_required(soup):
soup = self._authenticate_tfa(sess, soup)
soup = await self._authenticate_tfa(client, soup)
if not self._login_successful(soup):
print("Incorrect credentials.")
@ -105,11 +105,11 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
"RelayState": relay_state["value"],
"SAMLResponse": saml_response["value"],
}
sess.post(url, data=data)
await client.post(url, data=data)
def _authenticate_tfa(
async def _authenticate_tfa(
self,
client: httpx.Client,
client: httpx.AsyncClient,
soup: bs4.BeautifulSoup
) -> bs4.BeautifulSoup:
# Searching the form here so that this fails before asking for
@ -125,7 +125,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
"_eventId_proceed": "",
"j_tokenNumber": self._tfa_auth.get_token()
}
return soupify(client.post(url, data=data))
return soupify(await client.post(url, data=data))
@staticmethod
def _login_successful(soup: bs4.BeautifulSoup) -> bool:

View file

@ -2,18 +2,20 @@
Contains an ILIAS crawler alongside helper functions.
"""
from asyncio.queues import Queue
import datetime
import json
import logging
import re
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Awaitable, Dict, List, Optional, Union
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit,
urlunsplit)
import bs4
import httpx
import asyncio
from ..errors import FatalException, retry_on_io_exception
from ..logging import PrettyLogger
@ -62,14 +64,16 @@ class IliasCrawlerEntry:
def __init__(
self,
path: Path,
url: Union[str, Callable[[], Optional[str]]],
url: Union[str, Callable[[], Awaitable[Optional[str]]]],
entry_type: IliasElementType,
modification_date: Optional[datetime.datetime]
):
self.path = path
if isinstance(url, str):
str_url = url
self.url: Callable[[], Optional[str]] = lambda: str_url
# TODO: Dirty hack, remove
future = asyncio.Future()
future.set_result(url)
self.url: Callable[[], Awaitable[Optional[str]]] = lambda: future
else:
self.url = url
self.entry_type = entry_type
@ -96,7 +100,7 @@ class IliasCrawler:
def __init__(
self,
base_url: str,
client: httpx.Client,
client: httpx.AsyncClient,
authenticator: IliasAuthenticator,
dir_filter: IliasDirectoryFilter
):
@ -121,17 +125,17 @@ class IliasCrawler:
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]:
async def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]:
"""
Crawls a given url *and all reachable elements in it*.
Args:
url {str} -- the *full* url to crawl
"""
start_entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), url)
return self._iterate_entries_to_download_infos(start_entries)
start_entries: List[IliasCrawlerEntry] = await self._crawl_folder(Path(""), url)
return await self._iterate_entries_to_download_infos(start_entries)
def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
async def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]:
"""
Starts the crawl process for a course, yielding a list of elements to (potentially)
download.
@ -147,28 +151,28 @@ class IliasCrawler:
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
if not self._is_course_id_valid(root_url, course_id):
if not await self._is_course_id_valid(root_url, course_id):
raise FatalException(
"Invalid course id? I didn't find anything looking like a course!"
)
# And treat it as a folder
entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), root_url)
return self._iterate_entries_to_download_infos(entries)
entries: List[IliasCrawlerEntry] = await self._crawl_folder(Path(""), root_url)
return await self._iterate_entries_to_download_infos(entries)
def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: httpx.Response = self._client.get(root_url)
async def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: httpx.Response = await self._client.get(root_url)
# We were redirected ==> Non-existant ID
if course_id not in str(response.url):
return False
link_element: bs4.Tag = self._get_page(root_url, {}).find(id="current_perma_link")
link_element: bs4.Tag = (await self._get_page(root_url, {})).find(id="current_perma_link")
if not link_element:
return False
# It wasn't a course but a category list, forum, etc.
return "crs_" in link_element.get("value")
def find_course_name(self, course_id: str) -> Optional[str]:
async def find_course_name(self, course_id: str) -> Optional[str]:
"""
Returns the name of a given course. None if it is not a valid course
or it could not be found.
@ -176,81 +180,111 @@ class IliasCrawler:
course_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}"
)
return self.find_element_name(course_url)
return await self.find_element_name(course_url)
def find_element_name(self, url: str) -> Optional[str]:
async def find_element_name(self, url: str) -> Optional[str]:
"""
Returns the name of the element at the given URL, if it can find one.
"""
focus_element: bs4.Tag = self._get_page(url, {}).find(id="il_mhead_t_focus")
focus_element: bs4.Tag = await self._get_page(url, {}).find(id="il_mhead_t_focus")
if not focus_element:
return None
return focus_element.text
def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
async def crawl_personal_desktop(self) -> List[IliasDownloadInfo]:
"""
Crawls the ILIAS personal desktop (and every subelements that can be reached from there).
Raises:
FatalException: if an unrecoverable error occurs
"""
entries: List[IliasCrawlerEntry] = self._crawl_folder(
entries: List[IliasCrawlerEntry] = await self._crawl_folder(
Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI"
)
return self._iterate_entries_to_download_infos(entries)
return await self._iterate_entries_to_download_infos(entries)
def _iterate_entries_to_download_infos(
self,
entries: List[IliasCrawlerEntry]
) -> List[IliasDownloadInfo]:
result: List[IliasDownloadInfo] = []
entries_to_process: List[IliasCrawlerEntry] = entries.copy()
while len(entries_to_process) > 0:
entry = entries_to_process.pop()
async def _crawl_worker(self, entries_to_process: asyncio.Queue, result: List[IliasDownloadInfo]):
while True:
entry = await entries_to_process.get()
if entry.entry_type == IliasElementType.EXTERNAL_LINK:
PRETTY.not_searching(entry.path, "external link")
entries_to_process.task_done()
continue
if entry.entry_type == IliasElementType.FORUM:
PRETTY.not_searching(entry.path, "forum")
entries_to_process.task_done()
continue
if entry.entry_type.is_folder() and not self.dir_filter(entry.path, entry.entry_type):
PRETTY.not_searching(entry.path, "user filter")
entries_to_process.task_done()
continue
download_info = entry.to_download_info()
if download_info is not None:
result.append(download_info)
entries_to_process.task_done()
continue
url = entry.url()
url = await entry.url()
if url is None:
PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it")
entries_to_process.task_done()
continue
PRETTY.searching(entry.path)
if entry.entry_type == IliasElementType.EXERCISE_FOLDER:
entries_to_process += self._crawl_exercises(entry.path, url)
for task in await self._crawl_exercises(entry.path, url):
entries_to_process.put_nowait(task)
entries_to_process.task_done()
continue
if entry.entry_type == IliasElementType.REGULAR_FOLDER:
entries_to_process += self._crawl_folder(entry.path, url)
for task in await self._crawl_folder(entry.path, url):
entries_to_process.put_nowait(task)
entries_to_process.task_done()
continue
if entry.entry_type == IliasElementType.VIDEO_FOLDER:
entries_to_process += self._crawl_video_directory(entry.path, url)
for task in await self._crawl_video_directory(entry.path, url):
entries_to_process.put_nowait(task)
entries_to_process.task_done()
continue
PRETTY.warning(f"Unknown type: {entry.entry_type}!")
async def _iterate_entries_to_download_infos(
self,
entries: List[IliasCrawlerEntry]
) -> List[IliasDownloadInfo]:
result: List[IliasDownloadInfo] = []
crawl_queue = asyncio.Queue()
for entry in entries:
crawl_queue.put_nowait(entry)
workers = []
# TODO: Find proper worker limit
for _ in range(10):
worker = asyncio.create_task(self._crawl_worker(crawl_queue, result))
workers.append(worker)
await crawl_queue.join()
for worker in workers:
worker.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*workers, return_exceptions=True)
return result
def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]:
async def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl all files in a folder-like element.
"""
soup = self._get_page(url, {})
soup = await self._get_page(url, {})
if soup.find(id="headerimage"):
element: bs4.Tag = soup.find(id="headerimage")
@ -415,18 +449,18 @@ class IliasCrawler:
IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date)
]
def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]:
async def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl the video overview site.
"""
initial_soup = self._get_page(url, {})
initial_soup = await self._get_page(url, {})
# The page is actually emtpy but contains a much needed token in the link below.
# That token can be used to fetch the *actual* video listing
content_link: bs4.Tag = initial_soup.select_one("#tab_series a")
# Fetch the actual video listing. The given parameters return all videos (max 800)
# in a standalone html page
video_list_soup = self._get_page(
video_list_soup = await self._get_page(
self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
@ -445,7 +479,7 @@ class IliasCrawler:
def _is_paginated_video_page(soup: bs4.BeautifulSoup) -> bool:
return soup.find(id=re.compile(r"tab_page_sel.+")) is not None
def _crawl_paginated_video_directory(
async def _crawl_paginated_video_directory(
self,
video_dir_path: Path,
paged_video_list_soup: bs4.BeautifulSoup,
@ -475,7 +509,7 @@ class IliasCrawler:
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup)
table_id = match.group(1)
extended_video_page = self._get_page(
extended_video_page = await self._get_page(
second_stage_url,
{f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"}
)
@ -557,14 +591,14 @@ class IliasCrawler:
modification_time
)]
def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Optional[str]]:
def inner() -> Optional[str]:
def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Awaitable[Optional[str]]]:
async def inner() -> Optional[str]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere
# on the page, but defined in a JS object inside a script tag, passed to the player
# library.
# We do the impossible and RegEx the stream JSON object out of the page's HTML source
video_page_soup = soupify(self._client.get(play_url))
video_page_soup = soupify(await self._client.get(play_url))
regex: re.Pattern = re.compile(
r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE
)
@ -582,11 +616,11 @@ class IliasCrawler:
return video_url
return inner
def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]:
async def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]:
"""
Crawl files offered for download in exercises.
"""
soup = self._get_page(url, {})
soup = await self._get_page(url, {})
results: List[IliasCrawlerEntry] = []
@ -626,7 +660,7 @@ class IliasCrawler:
return results
@retry_on_io_exception(3, "fetching webpage")
def _get_page(self, url: str, params: Dict[str, Any],
async def _get_page(self, url: str, params: Dict[str, Any],
retry_count: int = 0) -> bs4.BeautifulSoup:
"""
Fetches a page from ILIAS, authenticating when needed.
@ -639,7 +673,7 @@ class IliasCrawler:
LOGGER.debug("Fetching %r", url)
response = self._client.get(url, params=params)
response = await self._client.get(url, params=params)
content_type = response.headers["content-type"]
if not content_type.startswith("text/html"):
@ -657,7 +691,7 @@ class IliasCrawler:
self._authenticator.authenticate(self._client)
return self._get_page(url, params, retry_count + 1)
return await self._get_page(url, params, retry_count + 1)
@staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:

View file

@ -5,10 +5,11 @@ import logging
import math
import os
from pathlib import Path, PurePath
from typing import Callable, List, Optional, Union
from typing import Callable, Awaitable, List, Optional, Union
import bs4
import httpx
import asyncio
from ..errors import retry_on_io_exception
from ..logging import PrettyLogger
@ -34,13 +35,14 @@ class IliasDownloadInfo(Transformable):
def __init__(
self,
path: PurePath,
url: Union[str, Callable[[], Optional[str]]],
url: Union[str, Callable[[], Awaitable[Optional[str]]]],
modifcation_date: Optional[datetime.datetime]
):
super().__init__(path)
if isinstance(url, str):
string_url = url
self.url: Callable[[], Optional[str]] = lambda: string_url
future = asyncio.Future()
future.set_result(url)
self.url: Callable[[], Optional[str]] = lambda: future
else:
self.url = url
self.modification_date = modifcation_date
@ -98,15 +100,15 @@ class IliasDownloader:
self._strategy = strategy
self._timeout = timeout
def download_all(self, infos: List[IliasDownloadInfo]) -> None:
async def download_all(self, infos: List[IliasDownloadInfo]) -> None:
"""
Download multiple files one after the other.
"""
for info in infos:
self.download(info)
tasks = [self.download(info) for info in infos]
await asyncio.gather(*tasks)
def download(self, info: IliasDownloadInfo) -> None:
async def download(self, info: IliasDownloadInfo) -> None:
"""
Download a file from ILIAS.
@ -122,15 +124,15 @@ class IliasDownloader:
tmp_file = self._tmp_dir.new_path()
@retry_on_io_exception(3, "downloading file")
def download_impl() -> bool:
if not self._try_download(info, tmp_file):
async def download_impl() -> bool:
if not await self._try_download(info, tmp_file):
LOGGER.info("Re-Authenticating due to download failure: %r", info)
self._authenticator.authenticate(self._client)
raise IOError("Scheduled retry")
else:
return True
if not download_impl():
if not await download_impl():
PRETTY.error(f"Download of file {info.path} failed too often! Skipping it...")
return
@ -144,8 +146,8 @@ class IliasDownloader:
)
)
def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
url = info.url()
async def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
url = await info.url()
if url is None:
PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/")
return True

View file

@ -4,7 +4,8 @@ Convenience functions for using PFERD.
import logging
from pathlib import Path
from typing import Callable, List, Optional, Union
from typing import Callable, Awaitable, List, Optional, Union
import asyncio
from .authenticators import UserPassAuthenticator
from .cookie_jar import CookieJar
@ -72,11 +73,11 @@ class Pferd(Location):
inner_auth = UserPassAuthenticator("ILIAS - Pferd.py", username, password)
return KitShibbolethAuthenticator(inner_auth)
def _ilias(
async def _ilias(
self,
target: PathLike,
base_url: str,
crawl_function: Callable[[IliasCrawler], List[IliasDownloadInfo]],
crawl_function: Callable[[IliasCrawler], Awaitable[List[IliasDownloadInfo]]],
authenticator: IliasAuthenticator,
cookies: Optional[PathLike],
dir_filter: IliasDirectoryFilter,
@ -89,28 +90,31 @@ class Pferd(Location):
# pylint: disable=too-many-locals
cookie_jar = CookieJar(to_path(cookies) if cookies else None)
client = cookie_jar.create_client()
async_client = cookie_jar.create_async_client()
tmp_dir = self._tmp_dir.new_subdir()
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
crawler = IliasCrawler(base_url, client, authenticator, dir_filter)
crawler = IliasCrawler(base_url, async_client, authenticator, dir_filter)
downloader = IliasDownloader(tmp_dir, organizer, client,
authenticator, download_strategy, timeout)
cookie_jar.load_cookies()
info = crawl_function(crawler)
info = await crawl_function(crawler)
cookie_jar.save_cookies()
transformed = apply_transform(transform, info)
if self._test_run:
self._print_transformables(transformed)
return organizer
downloader.download_all(transformed)
await downloader.download_all(transformed)
cookie_jar.save_cookies()
if clean:
organizer.cleanup()
await async_client.aclose()
return organizer
@swallow_and_print_errors
@ -161,7 +165,7 @@ class Pferd(Location):
authenticator = Pferd._get_authenticator(username=username, password=password)
PRETTY.starting_synchronizer(target, "ILIAS", course_id)
organizer = self._ilias(
organizer = asyncio.run(self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_course(course_id),
@ -173,7 +177,7 @@ class Pferd(Location):
clean=clean,
timeout=timeout,
file_conflict_resolver=file_conflict_resolver
)
))
self._download_summary.merge(organizer.download_summary)
@ -230,7 +234,7 @@ class Pferd(Location):
authenticator = Pferd._get_authenticator(username, password)
PRETTY.starting_synchronizer(target, "ILIAS", "Personal Desktop")
organizer = self._ilias(
organizer = asyncio.run(self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_personal_desktop(),
@ -242,7 +246,7 @@ class Pferd(Location):
clean=clean,
timeout=timeout,
file_conflict_resolver=file_conflict_resolver
)
))
self._download_summary.merge(organizer.download_summary)
@ -298,7 +302,7 @@ class Pferd(Location):
if not full_url.startswith("https://ilias.studium.kit.edu"):
raise FatalException("Not a valid KIT ILIAS URL")
organizer = self._ilias(
organizer = asyncio.run(self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.recursive_crawl_url(full_url),
@ -310,7 +314,7 @@ class Pferd(Location):
clean=clean,
timeout=timeout,
file_conflict_resolver=file_conflict_resolver
)
))
self._download_summary.merge(organizer.download_summary)