diff --git a/PFERD/cookie_jar.py b/PFERD/cookie_jar.py index 754979c..67ac69a 100644 --- a/PFERD/cookie_jar.py +++ b/PFERD/cookie_jar.py @@ -65,3 +65,10 @@ class CookieJar: client.cookies = self.cookies # type: ignore return client + + def create_async_client(self) -> httpx.AsyncClient: + """Create a new async client using the cookie jar.""" + # TODO: timeout=None was the default behaviour of requests. An approprite value should probably be set + client = httpx.AsyncClient(timeout=None) + client.cookies = self.cookies + return client diff --git a/PFERD/ilias/authenticators.py b/PFERD/ilias/authenticators.py index c1f4087..39e8bb5 100644 --- a/PFERD/ilias/authenticators.py +++ b/PFERD/ilias/authenticators.py @@ -24,9 +24,9 @@ class IliasAuthenticator(abc.ABC): """ @abc.abstractmethod - def authenticate(self, client: httpx.Client) -> None: + async def authenticate(self, client: httpx.AsyncClient) -> None: """ - Log a httpx client into this authenticator's ILIAS account. + Log a httpx AsyncClient into this authenticator's ILIAS account. """ @@ -45,7 +45,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator): self._tfa_auth = TfaAuthenticator("KIT ILIAS Shibboleth") - def authenticate(self, sess: httpx.Client) -> None: + async def authenticate(self, client: httpx.AsyncClient) -> None: """ Performs the ILIAS Shibboleth authentication dance and saves the login cookies it receieves. @@ -65,7 +65,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator): "target": "/shib_login.php", "home_organization_selection": "Mit KIT-Account anmelden", } - soup = soupify(sess.post(url, data=data)) + soup = soupify(await client.post(url, data=data)) # Attempt to login using credentials, if necessary while not self._login_successful(soup): @@ -86,10 +86,10 @@ class KitShibbolethAuthenticator(IliasAuthenticator): "j_password": self._auth.password, "csrf_token": csrf_token } - soup = soupify(sess.post(url, data=data)) + soup = soupify(await client.post(url, data=data)) if self._tfa_required(soup): - soup = self._authenticate_tfa(sess, soup) + soup = await self._authenticate_tfa(client, soup) if not self._login_successful(soup): print("Incorrect credentials.") @@ -105,11 +105,11 @@ class KitShibbolethAuthenticator(IliasAuthenticator): "RelayState": relay_state["value"], "SAMLResponse": saml_response["value"], } - sess.post(url, data=data) + await client.post(url, data=data) - def _authenticate_tfa( + async def _authenticate_tfa( self, - client: httpx.Client, + client: httpx.AsyncClient, soup: bs4.BeautifulSoup ) -> bs4.BeautifulSoup: # Searching the form here so that this fails before asking for @@ -125,7 +125,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator): "_eventId_proceed": "", "j_tokenNumber": self._tfa_auth.get_token() } - return soupify(client.post(url, data=data)) + return soupify(await client.post(url, data=data)) @staticmethod def _login_successful(soup: bs4.BeautifulSoup) -> bool: diff --git a/PFERD/ilias/crawler.py b/PFERD/ilias/crawler.py index 9726a4f..a43b430 100644 --- a/PFERD/ilias/crawler.py +++ b/PFERD/ilias/crawler.py @@ -2,18 +2,20 @@ Contains an ILIAS crawler alongside helper functions. """ +from asyncio.queues import Queue import datetime import json import logging import re from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Awaitable, Dict, List, Optional, Union from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit, urlunsplit) import bs4 import httpx +import asyncio from ..errors import FatalException, retry_on_io_exception from ..logging import PrettyLogger @@ -62,14 +64,16 @@ class IliasCrawlerEntry: def __init__( self, path: Path, - url: Union[str, Callable[[], Optional[str]]], + url: Union[str, Callable[[], Awaitable[Optional[str]]]], entry_type: IliasElementType, modification_date: Optional[datetime.datetime] ): self.path = path if isinstance(url, str): - str_url = url - self.url: Callable[[], Optional[str]] = lambda: str_url + # TODO: Dirty hack, remove + future = asyncio.Future() + future.set_result(url) + self.url: Callable[[], Awaitable[Optional[str]]] = lambda: future else: self.url = url self.entry_type = entry_type @@ -96,7 +100,7 @@ class IliasCrawler: def __init__( self, base_url: str, - client: httpx.Client, + client: httpx.AsyncClient, authenticator: IliasAuthenticator, dir_filter: IliasDirectoryFilter ): @@ -121,17 +125,17 @@ class IliasCrawler: return urlunsplit((scheme, netloc, path, new_query_string, fragment)) - def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]: + async def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]: """ Crawls a given url *and all reachable elements in it*. Args: url {str} -- the *full* url to crawl """ - start_entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), url) - return self._iterate_entries_to_download_infos(start_entries) + start_entries: List[IliasCrawlerEntry] = await self._crawl_folder(Path(""), url) + return await self._iterate_entries_to_download_infos(start_entries) - def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]: + async def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]: """ Starts the crawl process for a course, yielding a list of elements to (potentially) download. @@ -147,28 +151,28 @@ class IliasCrawler: self._base_url + "/goto.php", "target", f"crs_{course_id}" ) - if not self._is_course_id_valid(root_url, course_id): + if not await self._is_course_id_valid(root_url, course_id): raise FatalException( "Invalid course id? I didn't find anything looking like a course!" ) # And treat it as a folder - entries: List[IliasCrawlerEntry] = self._crawl_folder(Path(""), root_url) - return self._iterate_entries_to_download_infos(entries) + entries: List[IliasCrawlerEntry] = await self._crawl_folder(Path(""), root_url) + return await self._iterate_entries_to_download_infos(entries) - def _is_course_id_valid(self, root_url: str, course_id: str) -> bool: - response: httpx.Response = self._client.get(root_url) + async def _is_course_id_valid(self, root_url: str, course_id: str) -> bool: + response: httpx.Response = await self._client.get(root_url) # We were redirected ==> Non-existant ID if course_id not in str(response.url): return False - link_element: bs4.Tag = self._get_page(root_url, {}).find(id="current_perma_link") + link_element: bs4.Tag = (await self._get_page(root_url, {})).find(id="current_perma_link") if not link_element: return False # It wasn't a course but a category list, forum, etc. return "crs_" in link_element.get("value") - def find_course_name(self, course_id: str) -> Optional[str]: + async def find_course_name(self, course_id: str) -> Optional[str]: """ Returns the name of a given course. None if it is not a valid course or it could not be found. @@ -176,81 +180,111 @@ class IliasCrawler: course_url = self._url_set_query_param( self._base_url + "/goto.php", "target", f"crs_{course_id}" ) - return self.find_element_name(course_url) + return await self.find_element_name(course_url) - def find_element_name(self, url: str) -> Optional[str]: + async def find_element_name(self, url: str) -> Optional[str]: """ Returns the name of the element at the given URL, if it can find one. """ - focus_element: bs4.Tag = self._get_page(url, {}).find(id="il_mhead_t_focus") + focus_element: bs4.Tag = await self._get_page(url, {}).find(id="il_mhead_t_focus") if not focus_element: return None return focus_element.text - def crawl_personal_desktop(self) -> List[IliasDownloadInfo]: + async def crawl_personal_desktop(self) -> List[IliasDownloadInfo]: """ Crawls the ILIAS personal desktop (and every subelements that can be reached from there). Raises: FatalException: if an unrecoverable error occurs """ - entries: List[IliasCrawlerEntry] = self._crawl_folder( + entries: List[IliasCrawlerEntry] = await self._crawl_folder( Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI" ) - return self._iterate_entries_to_download_infos(entries) + return await self._iterate_entries_to_download_infos(entries) - def _iterate_entries_to_download_infos( - self, - entries: List[IliasCrawlerEntry] - ) -> List[IliasDownloadInfo]: - result: List[IliasDownloadInfo] = [] - entries_to_process: List[IliasCrawlerEntry] = entries.copy() - while len(entries_to_process) > 0: - entry = entries_to_process.pop() + async def _crawl_worker(self, entries_to_process: asyncio.Queue, result: List[IliasDownloadInfo]): + while True: + entry = await entries_to_process.get() if entry.entry_type == IliasElementType.EXTERNAL_LINK: PRETTY.not_searching(entry.path, "external link") + entries_to_process.task_done() continue if entry.entry_type == IliasElementType.FORUM: PRETTY.not_searching(entry.path, "forum") + entries_to_process.task_done() continue if entry.entry_type.is_folder() and not self.dir_filter(entry.path, entry.entry_type): PRETTY.not_searching(entry.path, "user filter") + entries_to_process.task_done() continue download_info = entry.to_download_info() if download_info is not None: result.append(download_info) + entries_to_process.task_done() continue - url = entry.url() + url = await entry.url() if url is None: PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it") + entries_to_process.task_done() continue PRETTY.searching(entry.path) if entry.entry_type == IliasElementType.EXERCISE_FOLDER: - entries_to_process += self._crawl_exercises(entry.path, url) + for task in await self._crawl_exercises(entry.path, url): + entries_to_process.put_nowait(task) + entries_to_process.task_done() continue if entry.entry_type == IliasElementType.REGULAR_FOLDER: - entries_to_process += self._crawl_folder(entry.path, url) + for task in await self._crawl_folder(entry.path, url): + entries_to_process.put_nowait(task) + entries_to_process.task_done() continue if entry.entry_type == IliasElementType.VIDEO_FOLDER: - entries_to_process += self._crawl_video_directory(entry.path, url) + for task in await self._crawl_video_directory(entry.path, url): + entries_to_process.put_nowait(task) + entries_to_process.task_done() continue PRETTY.warning(f"Unknown type: {entry.entry_type}!") + + async def _iterate_entries_to_download_infos( + self, + entries: List[IliasCrawlerEntry] + ) -> List[IliasDownloadInfo]: + result: List[IliasDownloadInfo] = [] + crawl_queue = asyncio.Queue() + for entry in entries: + crawl_queue.put_nowait(entry) + + workers = [] + + # TODO: Find proper worker limit + for _ in range(10): + worker = asyncio.create_task(self._crawl_worker(crawl_queue, result)) + workers.append(worker) + + await crawl_queue.join() + + for worker in workers: + worker.cancel() + + # Wait until all worker tasks are cancelled. + await asyncio.gather(*workers, return_exceptions=True) return result - def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]: + async def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]: """ Crawl all files in a folder-like element. """ - soup = self._get_page(url, {}) + soup = await self._get_page(url, {}) if soup.find(id="headerimage"): element: bs4.Tag = soup.find(id="headerimage") @@ -415,18 +449,18 @@ class IliasCrawler: IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date) ] - def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]: + async def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]: """ Crawl the video overview site. """ - initial_soup = self._get_page(url, {}) + initial_soup = await self._get_page(url, {}) # The page is actually emtpy but contains a much needed token in the link below. # That token can be used to fetch the *actual* video listing content_link: bs4.Tag = initial_soup.select_one("#tab_series a") # Fetch the actual video listing. The given parameters return all videos (max 800) # in a standalone html page - video_list_soup = self._get_page( + video_list_soup = await self._get_page( self._abs_url_from_link(content_link), {"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} ) @@ -445,7 +479,7 @@ class IliasCrawler: def _is_paginated_video_page(soup: bs4.BeautifulSoup) -> bool: return soup.find(id=re.compile(r"tab_page_sel.+")) is not None - def _crawl_paginated_video_directory( + async def _crawl_paginated_video_directory( self, video_dir_path: Path, paged_video_list_soup: bs4.BeautifulSoup, @@ -475,7 +509,7 @@ class IliasCrawler: return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup) table_id = match.group(1) - extended_video_page = self._get_page( + extended_video_page = await self._get_page( second_stage_url, {f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} ) @@ -557,14 +591,14 @@ class IliasCrawler: modification_time )] - def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Optional[str]]: - def inner() -> Optional[str]: + def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Awaitable[Optional[str]]]: + async def inner() -> Optional[str]: # Fetch the actual video page. This is a small wrapper page initializing a javscript # player. Sadly we can not execute that JS. The actual video stream url is nowhere # on the page, but defined in a JS object inside a script tag, passed to the player # library. # We do the impossible and RegEx the stream JSON object out of the page's HTML source - video_page_soup = soupify(self._client.get(play_url)) + video_page_soup = soupify(await self._client.get(play_url)) regex: re.Pattern = re.compile( r"({\"streams\"[\s\S]+?),\s*{\"paella_config_file", re.IGNORECASE ) @@ -582,11 +616,11 @@ class IliasCrawler: return video_url return inner - def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]: + async def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]: """ Crawl files offered for download in exercises. """ - soup = self._get_page(url, {}) + soup = await self._get_page(url, {}) results: List[IliasCrawlerEntry] = [] @@ -626,7 +660,7 @@ class IliasCrawler: return results @retry_on_io_exception(3, "fetching webpage") - def _get_page(self, url: str, params: Dict[str, Any], + async def _get_page(self, url: str, params: Dict[str, Any], retry_count: int = 0) -> bs4.BeautifulSoup: """ Fetches a page from ILIAS, authenticating when needed. @@ -639,7 +673,7 @@ class IliasCrawler: LOGGER.debug("Fetching %r", url) - response = self._client.get(url, params=params) + response = await self._client.get(url, params=params) content_type = response.headers["content-type"] if not content_type.startswith("text/html"): @@ -657,7 +691,7 @@ class IliasCrawler: self._authenticator.authenticate(self._client) - return self._get_page(url, params, retry_count + 1) + return await self._get_page(url, params, retry_count + 1) @staticmethod def _is_logged_in(soup: bs4.BeautifulSoup) -> bool: diff --git a/PFERD/ilias/downloader.py b/PFERD/ilias/downloader.py index 005dfec..9f5049e 100644 --- a/PFERD/ilias/downloader.py +++ b/PFERD/ilias/downloader.py @@ -5,10 +5,11 @@ import logging import math import os from pathlib import Path, PurePath -from typing import Callable, List, Optional, Union +from typing import Callable, Awaitable, List, Optional, Union import bs4 import httpx +import asyncio from ..errors import retry_on_io_exception from ..logging import PrettyLogger @@ -34,13 +35,14 @@ class IliasDownloadInfo(Transformable): def __init__( self, path: PurePath, - url: Union[str, Callable[[], Optional[str]]], + url: Union[str, Callable[[], Awaitable[Optional[str]]]], modifcation_date: Optional[datetime.datetime] ): super().__init__(path) if isinstance(url, str): - string_url = url - self.url: Callable[[], Optional[str]] = lambda: string_url + future = asyncio.Future() + future.set_result(url) + self.url: Callable[[], Optional[str]] = lambda: future else: self.url = url self.modification_date = modifcation_date @@ -98,15 +100,15 @@ class IliasDownloader: self._strategy = strategy self._timeout = timeout - def download_all(self, infos: List[IliasDownloadInfo]) -> None: + async def download_all(self, infos: List[IliasDownloadInfo]) -> None: """ Download multiple files one after the other. """ - for info in infos: - self.download(info) + tasks = [self.download(info) for info in infos] + await asyncio.gather(*tasks) - def download(self, info: IliasDownloadInfo) -> None: + async def download(self, info: IliasDownloadInfo) -> None: """ Download a file from ILIAS. @@ -122,15 +124,15 @@ class IliasDownloader: tmp_file = self._tmp_dir.new_path() @retry_on_io_exception(3, "downloading file") - def download_impl() -> bool: - if not self._try_download(info, tmp_file): + async def download_impl() -> bool: + if not await self._try_download(info, tmp_file): LOGGER.info("Re-Authenticating due to download failure: %r", info) self._authenticator.authenticate(self._client) raise IOError("Scheduled retry") else: return True - if not download_impl(): + if not await download_impl(): PRETTY.error(f"Download of file {info.path} failed too often! Skipping it...") return @@ -144,8 +146,8 @@ class IliasDownloader: ) ) - def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool: - url = info.url() + async def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool: + url = await info.url() if url is None: PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/") return True diff --git a/PFERD/pferd.py b/PFERD/pferd.py index 3efe8f2..339cc1b 100644 --- a/PFERD/pferd.py +++ b/PFERD/pferd.py @@ -4,7 +4,8 @@ Convenience functions for using PFERD. import logging from pathlib import Path -from typing import Callable, List, Optional, Union +from typing import Callable, Awaitable, List, Optional, Union +import asyncio from .authenticators import UserPassAuthenticator from .cookie_jar import CookieJar @@ -72,11 +73,11 @@ class Pferd(Location): inner_auth = UserPassAuthenticator("ILIAS - Pferd.py", username, password) return KitShibbolethAuthenticator(inner_auth) - def _ilias( + async def _ilias( self, target: PathLike, base_url: str, - crawl_function: Callable[[IliasCrawler], List[IliasDownloadInfo]], + crawl_function: Callable[[IliasCrawler], Awaitable[List[IliasDownloadInfo]]], authenticator: IliasAuthenticator, cookies: Optional[PathLike], dir_filter: IliasDirectoryFilter, @@ -89,28 +90,31 @@ class Pferd(Location): # pylint: disable=too-many-locals cookie_jar = CookieJar(to_path(cookies) if cookies else None) client = cookie_jar.create_client() + async_client = cookie_jar.create_async_client() tmp_dir = self._tmp_dir.new_subdir() organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver) - crawler = IliasCrawler(base_url, client, authenticator, dir_filter) + crawler = IliasCrawler(base_url, async_client, authenticator, dir_filter) downloader = IliasDownloader(tmp_dir, organizer, client, authenticator, download_strategy, timeout) cookie_jar.load_cookies() - info = crawl_function(crawler) + info = await crawl_function(crawler) cookie_jar.save_cookies() + transformed = apply_transform(transform, info) if self._test_run: self._print_transformables(transformed) return organizer - downloader.download_all(transformed) + await downloader.download_all(transformed) cookie_jar.save_cookies() if clean: organizer.cleanup() + await async_client.aclose() return organizer @swallow_and_print_errors @@ -161,7 +165,7 @@ class Pferd(Location): authenticator = Pferd._get_authenticator(username=username, password=password) PRETTY.starting_synchronizer(target, "ILIAS", course_id) - organizer = self._ilias( + organizer = asyncio.run(self._ilias( target=target, base_url="https://ilias.studium.kit.edu/", crawl_function=lambda crawler: crawler.crawl_course(course_id), @@ -173,7 +177,7 @@ class Pferd(Location): clean=clean, timeout=timeout, file_conflict_resolver=file_conflict_resolver - ) + )) self._download_summary.merge(organizer.download_summary) @@ -230,7 +234,7 @@ class Pferd(Location): authenticator = Pferd._get_authenticator(username, password) PRETTY.starting_synchronizer(target, "ILIAS", "Personal Desktop") - organizer = self._ilias( + organizer = asyncio.run(self._ilias( target=target, base_url="https://ilias.studium.kit.edu/", crawl_function=lambda crawler: crawler.crawl_personal_desktop(), @@ -242,7 +246,7 @@ class Pferd(Location): clean=clean, timeout=timeout, file_conflict_resolver=file_conflict_resolver - ) + )) self._download_summary.merge(organizer.download_summary) @@ -298,7 +302,7 @@ class Pferd(Location): if not full_url.startswith("https://ilias.studium.kit.edu"): raise FatalException("Not a valid KIT ILIAS URL") - organizer = self._ilias( + organizer = asyncio.run(self._ilias( target=target, base_url="https://ilias.studium.kit.edu/", crawl_function=lambda crawler: crawler.recursive_crawl_url(full_url), @@ -310,7 +314,7 @@ class Pferd(Location): clean=clean, timeout=timeout, file_conflict_resolver=file_conflict_resolver - ) + )) self._download_summary.merge(organizer.download_summary)