Use shared ILIASCrawler

This commit is contained in:
be7a 2021-04-28 01:50:28 +02:00
parent 54a446c43c
commit c0ba8b9528
No known key found for this signature in database
GPG key ID: 6510870A77F49A99
21 changed files with 734 additions and 510 deletions

View file

@ -40,7 +40,7 @@ class CookieJar:
except (FileNotFoundError, LoadError): except (FileNotFoundError, LoadError):
LOGGER.warning( LOGGER.warning(
"No valid cookie file found at %s, continuing with no cookies", "No valid cookie file found at %s, continuing with no cookies",
self._cookies.filename self._cookies.filename,
) )
def save_cookies(self, reason: Optional[str] = None) -> None: def save_cookies(self, reason: Optional[str] = None) -> None:
@ -69,6 +69,5 @@ class CookieJar:
def create_async_client(self) -> httpx.AsyncClient: def create_async_client(self) -> httpx.AsyncClient:
"""Create a new async client using the cookie jar.""" """Create a new async client using the cookie jar."""
# TODO: timeout=None was the default behaviour of requests. An approprite value should probably be set # TODO: timeout=None was the default behaviour of requests. An approprite value should probably be set
client = httpx.AsyncClient(timeout=None) client = httpx.AsyncClient(timeout=None, cookies=self.cookies)
client.cookies = self.cookies
return client return client

View file

@ -25,6 +25,7 @@ class DivaDownloadInfo(Transformable):
""" """
Information about a DIVA video Information about a DIVA video
""" """
url: str url: str
@ -49,7 +50,9 @@ class DivaPlaylistCrawler:
""" """
_PLAYLIST_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/detail/" _PLAYLIST_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/detail/"
_COLLECTION_BASE_URL = "https://mediaservice.bibliothek.kit.edu/asset/collection.json" _COLLECTION_BASE_URL = (
"https://mediaservice.bibliothek.kit.edu/asset/collection.json"
)
def __init__(self, playlist_id: str): def __init__(self, playlist_id: str):
self._id = playlist_id self._id = playlist_id
@ -108,15 +111,16 @@ class DivaPlaylistCrawler:
title = video["title"] title = video["title"]
collection_title = self._follow_path(["collection", "title"], video) collection_title = self._follow_path(["collection", "title"], video)
url = self._follow_path( url = self._follow_path(
["resourceList", "derivateList", "mp4", "url"], ["resourceList", "derivateList", "mp4", "url"], video
video
) )
if url and collection_title and title: if url and collection_title and title:
path = Path(collection_title, title + ".mp4") path = Path(collection_title, title + ".mp4")
download_infos.append(DivaDownloadInfo(path, url)) download_infos.append(DivaDownloadInfo(path, url))
else: else:
PRETTY.warning(f"Incomplete video found: {title!r} {collection_title!r} {url!r}") PRETTY.warning(
f"Incomplete video found: {title!r} {collection_title!r} {url!r}"
)
return download_infos return download_infos
@ -139,7 +143,9 @@ class DivaDownloader:
A downloader for DIVA videos. A downloader for DIVA videos.
""" """
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: DivaDownloadStrategy): def __init__(
self, tmp_dir: TmpDir, organizer: Organizer, strategy: DivaDownloadStrategy
):
self._tmp_dir = tmp_dir self._tmp_dir = tmp_dir
self._organizer = organizer self._organizer = organizer
self._strategy = strategy self._strategy = strategy
@ -166,4 +172,6 @@ class DivaDownloader:
stream_to_path(response, tmp_file, info.path.name) stream_to_path(response, tmp_file, info.path.name)
self._organizer.accept_file(tmp_file, info.path) self._organizer.accept_file(tmp_file, info.path)
else: else:
PRETTY.warning(f"Could not download file, got response {response.status_code}") PRETTY.warning(
f"Could not download file, got response {response.status_code}"
)

View file

@ -42,13 +42,17 @@ class DownloadSummary:
""" """
return self._deleted_files.copy() return self._deleted_files.copy()
def merge(self, summary: 'DownloadSummary') -> None: def merge(self, summary: "DownloadSummary") -> None:
""" """
Merges ourselves with the passed summary. Modifies this object, but not the passed one. Merges ourselves with the passed summary. Modifies this object, but not the passed one.
""" """
self._new_files = _mergeNoDuplicate(self._new_files, summary.new_files) self._new_files = _mergeNoDuplicate(self._new_files, summary.new_files)
self._modified_files = _mergeNoDuplicate(self._modified_files, summary.modified_files) self._modified_files = _mergeNoDuplicate(
self._deleted_files = _mergeNoDuplicate(self._deleted_files, summary.deleted_files) self._modified_files, summary.modified_files
)
self._deleted_files = _mergeNoDuplicate(
self._deleted_files, summary.deleted_files
)
def add_deleted_file(self, path: Path) -> None: def add_deleted_file(self, path: Path) -> None:
""" """

View file

@ -65,4 +65,6 @@ class HttpDownloader:
self._organizer.accept_file(tmp_file, info.path) self._organizer.accept_file(tmp_file, info.path)
else: else:
# TODO use proper exception # TODO use proper exception
raise Exception(f"Could not download file, got response {response.status_code}") raise Exception(
f"Could not download file, got response {response.status_code}"
)

View file

@ -19,13 +19,14 @@ class FatalException(Exception):
""" """
TFun = TypeVar('TFun', bound=Callable[..., Any]) TFun = TypeVar("TFun", bound=Callable[..., Any])
def swallow_and_print_errors(function: TFun) -> TFun: def swallow_and_print_errors(function: TFun) -> TFun:
""" """
Decorates a function, swallows all errors, logs them and returns none if one occurred. Decorates a function, swallows all errors, logs them and returns none if one occurred.
""" """
def inner(*args: Any, **kwargs: Any) -> Any: def inner(*args: Any, **kwargs: Any) -> Any:
# pylint: disable=broad-except # pylint: disable=broad-except
try: try:
@ -36,6 +37,7 @@ def swallow_and_print_errors(function: TFun) -> TFun:
except Exception as error: except Exception as error:
Console().print_exception() Console().print_exception()
return None return None
return cast(TFun, inner) return cast(TFun, inner)
@ -43,6 +45,7 @@ def retry_on_io_exception(max_retries: int, message: str) -> Callable[[TFun], TF
""" """
Decorates a function and retries it on any exception until the max retries count is hit. Decorates a function and retries it on any exception until the max retries count is hit.
""" """
def retry(function: TFun) -> TFun: def retry(function: TFun) -> TFun:
def inner(*args: Any, **kwargs: Any) -> Any: def inner(*args: Any, **kwargs: Any) -> Any:
for i in range(0, max_retries): for i in range(0, max_retries):
@ -52,6 +55,9 @@ def retry_on_io_exception(max_retries: int, message: str) -> Callable[[TFun], TF
except IOError as error: except IOError as error:
PRETTY.warning(f"Error duing operation '{message}': {error}") PRETTY.warning(f"Error duing operation '{message}': {error}")
PRETTY.warning( PRETTY.warning(
f"Retrying operation '{message}'. Remaining retries: {max_retries - 1 - i}") f"Retrying operation '{message}'. Remaining retries: {max_retries - 1 - i}"
)
return cast(TFun, inner) return cast(TFun, inner)
return retry return retry

View file

@ -3,8 +3,18 @@ Synchronizing files from ILIAS instances (https://www.ilias.de/).
""" """
from .authenticators import IliasAuthenticator, KitShibbolethAuthenticator from .authenticators import IliasAuthenticator, KitShibbolethAuthenticator
from .crawler import (IliasCrawler, IliasCrawlerEntry, IliasDirectoryFilter, from .crawler import (
IliasElementType) IliasCrawler,
from .downloader import (IliasDownloader, IliasDownloadInfo, IliasCrawlerEntry,
IliasDownloadStrategy, download_everything, IliasDirectoryFilter,
download_modified_or_new) IliasElementType,
)
from .downloader import (
IliasDownloader,
IliasDownloadInfo,
IliasDownloadStrategy,
download_everything,
download_modified_or_new,
)
from .syncronizer import IliasSycronizer, ResultContainer

View file

@ -84,7 +84,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
"_eventId_proceed": "", "_eventId_proceed": "",
"j_username": self._auth.username, "j_username": self._auth.username,
"j_password": self._auth.password, "j_password": self._auth.password,
"csrf_token": csrf_token "csrf_token": csrf_token,
} }
soup = soupify(await client.post(url, data=data)) soup = soupify(await client.post(url, data=data))
@ -108,9 +108,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
await client.post(url, data=data) await client.post(url, data=data)
async def _authenticate_tfa( async def _authenticate_tfa(
self, self, client: httpx.AsyncClient, soup: bs4.BeautifulSoup
client: httpx.AsyncClient,
soup: bs4.BeautifulSoup
) -> bs4.BeautifulSoup: ) -> bs4.BeautifulSoup:
# Searching the form here so that this fails before asking for # Searching the form here so that this fails before asking for
# credentials rather than after asking. # credentials rather than after asking.
@ -121,10 +119,7 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO # https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth with TFA token") LOGGER.debug("Attempt to log in to Shibboleth with TFA token")
url = "https://idp.scc.kit.edu" + action url = "https://idp.scc.kit.edu" + action
data = { data = {"_eventId_proceed": "", "j_tokenNumber": self._tfa_auth.get_token()}
"_eventId_proceed": "",
"j_tokenNumber": self._tfa_auth.get_token()
}
return soupify(await client.post(url, data=data)) return soupify(await client.post(url, data=data))
@staticmethod @staticmethod

View file

@ -2,20 +2,18 @@
Contains an ILIAS crawler alongside helper functions. Contains an ILIAS crawler alongside helper functions.
""" """
from asyncio.queues import Queue
import datetime import datetime
import json import json
import logging import logging
import re import re
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Awaitable, Dict, List, Optional, Union from typing import Any, Callable, Awaitable, Dict, List, Optional, Union, Tuple
from urllib.parse import (parse_qs, urlencode, urljoin, urlparse, urlsplit, from urllib.parse import parse_qs, urlencode, urljoin, urlparse, urlsplit, urlunsplit
urlunsplit)
import asyncio
import bs4 import bs4
import httpx import httpx
import asyncio
from ..errors import FatalException, retry_on_io_exception from ..errors import FatalException, retry_on_io_exception
from ..logging import PrettyLogger from ..logging import PrettyLogger
@ -32,10 +30,23 @@ def _sanitize_path_name(name: str) -> str:
return name.replace("/", "-").replace("\\", "-") return name.replace("/", "-").replace("\\", "-")
class ResultContainer:
def __init__(self):
self._results = []
def add_result(self, result: IliasDownloadInfo):
self._results.append(result)
def get_results(self) -> List[IliasDownloadInfo]:
return self._results
class IliasElementType(Enum): class IliasElementType(Enum):
""" """
The type of an ilias element. The type of an ilias element.
""" """
COURSE = "COURSE"
REGULAR_FOLDER = "REGULAR_FOLDER" REGULAR_FOLDER = "REGULAR_FOLDER"
VIDEO_FOLDER = "VIDEO_FOLDER" VIDEO_FOLDER = "VIDEO_FOLDER"
EXERCISE_FOLDER = "EXERCISE_FOLDER" EXERCISE_FOLDER = "EXERCISE_FOLDER"
@ -55,6 +66,17 @@ class IliasElementType(Enum):
IliasDirectoryFilter = Callable[[Path, IliasElementType], bool] IliasDirectoryFilter = Callable[[Path, IliasElementType], bool]
class InvalidCourseError(FatalException):
"""
A invalid Course ID was encountered
"""
def __init__(course_id: str):
super(
f"Invalid course id {course_id}? I didn't find anything looking like a course!"
)
class IliasCrawlerEntry: class IliasCrawlerEntry:
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
""" """
@ -66,11 +88,10 @@ class IliasCrawlerEntry:
path: Path, path: Path,
url: Union[str, Callable[[], Awaitable[Optional[str]]]], url: Union[str, Callable[[], Awaitable[Optional[str]]]],
entry_type: IliasElementType, entry_type: IliasElementType,
modification_date: Optional[datetime.datetime] modification_date: Optional[datetime.datetime],
): ):
self.path = path self.path = path
if isinstance(url, str): if isinstance(url, str):
# TODO: Dirty hack, remove
future = asyncio.Future() future = asyncio.Future()
future.set_result(url) future.set_result(url)
self.url: Callable[[], Awaitable[Optional[str]]] = lambda: future self.url: Callable[[], Awaitable[Optional[str]]] = lambda: future
@ -84,7 +105,10 @@ class IliasCrawlerEntry:
Converts this crawler entry to an IliasDownloadInfo, if possible. Converts this crawler entry to an IliasDownloadInfo, if possible.
This method will only succeed for *File* types. This method will only succeed for *File* types.
""" """
if self.entry_type in [IliasElementType.REGULAR_FILE, IliasElementType.VIDEO_FILE]: if self.entry_type in [
IliasElementType.REGULAR_FILE,
IliasElementType.VIDEO_FILE,
]:
return IliasDownloadInfo(self.path, self.url, self.modification_date) return IliasDownloadInfo(self.path, self.url, self.modification_date)
return None return None
@ -102,12 +126,11 @@ class IliasCrawler:
base_url: str, base_url: str,
client: httpx.AsyncClient, client: httpx.AsyncClient,
authenticator: IliasAuthenticator, authenticator: IliasAuthenticator,
dir_filter: IliasDirectoryFilter dir_filter: IliasDirectoryFilter,
): ):
""" """
Create a new ILIAS crawler. Create a new ILIAS crawler.
""" """
self._base_url = base_url self._base_url = base_url
self._client = client self._client = client
self._authenticator = authenticator self._authenticator = authenticator
@ -125,52 +148,31 @@ class IliasCrawler:
return urlunsplit((scheme, netloc, path, new_query_string, fragment)) return urlunsplit((scheme, netloc, path, new_query_string, fragment))
async def recursive_crawl_url(self, url: str) -> List[IliasDownloadInfo]: async def recursive_crawl_url(self, url: str) -> IliasCrawlerEntry:
""" """
Crawls a given url *and all reachable elements in it*. Creates a crawl target for a given url *and all reachable elements in it*.
Args: Args:
url {str} -- the *full* url to crawl url {str} -- the *full* url to crawl
""" """
start_entries: List[IliasCrawlerEntry] = await self._crawl_folder(Path(""), url)
return await self._iterate_entries_to_download_infos(start_entries)
async def crawl_course(self, course_id: str) -> List[IliasDownloadInfo]: return IliasCrawlerEntry(Path(""), url, IliasElementType.REGULAR_FOLDER, None)
async def crawl_course(self, course_id: str) -> IliasCrawlerEntry:
""" """
Starts the crawl process for a course, yielding a list of elements to (potentially) Creates a crawl target for a course, yielding a list of elements to (potentially)
download. download.
Arguments: Arguments:
course_id {str} -- the course id course_id {str} -- the course id
Raises:
FatalException: if an unrecoverable error occurs or the course id is not valid
""" """
# Start crawling at the given course # Start crawling at the given course
root_url = self._url_set_query_param( root_url = self._url_set_query_param(
self._base_url + "/goto.php", "target", f"crs_{course_id}" self._base_url + "/goto.php", "target", f"crs_{course_id}"
) )
if not await self._is_course_id_valid(root_url, course_id): return IliasCrawlerEntry(Path(""), root_url, IliasElementType.COURSE, None)
raise FatalException(
"Invalid course id? I didn't find anything looking like a course!"
)
# And treat it as a folder
entries: List[IliasCrawlerEntry] = await self._crawl_folder(Path(""), root_url)
return await self._iterate_entries_to_download_infos(entries)
async def _is_course_id_valid(self, root_url: str, course_id: str) -> bool:
response: httpx.Response = await self._client.get(root_url)
# We were redirected ==> Non-existant ID
if course_id not in str(response.url):
return False
link_element: bs4.Tag = (await self._get_page(root_url, {})).find(id="current_perma_link")
if not link_element:
return False
# It wasn't a course but a category list, forum, etc.
return "crs_" in link_element.get("value")
async def find_course_name(self, course_id: str) -> Optional[str]: async def find_course_name(self, course_id: str) -> Optional[str]:
""" """
@ -186,26 +188,28 @@ class IliasCrawler:
""" """
Returns the name of the element at the given URL, if it can find one. Returns the name of the element at the given URL, if it can find one.
""" """
focus_element: bs4.Tag = await self._get_page(url, {}).find(id="il_mhead_t_focus") focus_element: bs4.Tag = await self._get_page(url, {}).find(
id="il_mhead_t_focus"
)
if not focus_element: if not focus_element:
return None return None
return focus_element.text return focus_element.text
async def crawl_personal_desktop(self) -> List[IliasDownloadInfo]: async def crawl_personal_desktop(self) -> IliasCrawlerEntry:
""" """
Crawls the ILIAS personal desktop (and every subelements that can be reached from there). Creates a crawl target for the ILIAS personal desktop (and every subelements that can be reached from there).
download.
Raises:
FatalException: if an unrecoverable error occurs
""" """
entries: List[IliasCrawlerEntry] = await self._crawl_folder( return IliasCrawlerEntry(
Path(""), self._base_url + "?baseClass=ilPersonalDesktopGUI" Path(""),
self._base_url + "?baseClass=ilPersonalDesktopGUI",
IliasElementType.REGULAR_FOLDER,
None,
) )
return await self._iterate_entries_to_download_infos(entries)
async def _crawl_worker(self, entries_to_process: asyncio.Queue, result: List[IliasDownloadInfo]): async def _crawl_worker(self, entries_to_process: asyncio.Queue):
while True: while True:
entry = await entries_to_process.get() (entry, results) = await entries_to_process.get()
if entry.entry_type == IliasElementType.EXTERNAL_LINK: if entry.entry_type == IliasElementType.EXTERNAL_LINK:
PRETTY.not_searching(entry.path, "external link") PRETTY.not_searching(entry.path, "external link")
@ -216,21 +220,25 @@ class IliasCrawler:
entries_to_process.task_done() entries_to_process.task_done()
continue continue
if entry.entry_type.is_folder() and not self.dir_filter(entry.path, entry.entry_type): if entry.entry_type.is_folder() and not self.dir_filter(
entry.path, entry.entry_type
):
PRETTY.not_searching(entry.path, "user filter") PRETTY.not_searching(entry.path, "user filter")
entries_to_process.task_done() entries_to_process.task_done()
continue continue
download_info = entry.to_download_info() download_info = entry.to_download_info()
if download_info is not None: if download_info is not None:
result.append(download_info) results.add_result(download_info)
entries_to_process.task_done() entries_to_process.task_done()
continue continue
url = await entry.url() url = await entry.url()
if url is None: if url is None:
PRETTY.warning(f"Could not find url for {str(entry.path)!r}, skipping it") PRETTY.warning(
f"Could not find url for {str(entry.path)!r}, skipping it"
)
entries_to_process.task_done() entries_to_process.task_done()
continue continue
@ -238,37 +246,46 @@ class IliasCrawler:
if entry.entry_type == IliasElementType.EXERCISE_FOLDER: if entry.entry_type == IliasElementType.EXERCISE_FOLDER:
for task in await self._crawl_exercises(entry.path, url): for task in await self._crawl_exercises(entry.path, url):
entries_to_process.put_nowait(task) entries_to_process.put_nowait((task, results))
entries_to_process.task_done() entries_to_process.task_done()
continue continue
if entry.entry_type == IliasElementType.REGULAR_FOLDER: if entry.entry_type == IliasElementType.REGULAR_FOLDER:
for task in await self._crawl_folder(entry.path, url): for task in await self._crawl_folder(entry.path, url):
entries_to_process.put_nowait(task) entries_to_process.put_nowait((task, results))
entries_to_process.task_done()
continue
if entry.entry_type == IliasElementType.COURSE:
for task in await self._crawl_folder(
entry.path, url, url.split("crs_")[1]
):
entries_to_process.put_nowait((task, results))
entries_to_process.task_done() entries_to_process.task_done()
continue continue
if entry.entry_type == IliasElementType.VIDEO_FOLDER: if entry.entry_type == IliasElementType.VIDEO_FOLDER:
for task in await self._crawl_video_directory(entry.path, url): for task in await self._crawl_video_directory(entry.path, url):
entries_to_process.put_nowait(task) entries_to_process.put_nowait((task, results))
entries_to_process.task_done() entries_to_process.task_done()
continue continue
PRETTY.warning(f"Unknown type: {entry.entry_type}!") PRETTY.warning(f"Unknown type: {entry.entry_type}!")
async def iterate_entries_to_download_infos(
async def _iterate_entries_to_download_infos( self, entries: List[Tuple[IliasCrawlerEntry, ResultContainer]]
self, ):
entries: List[IliasCrawlerEntry]
) -> List[IliasDownloadInfo]:
result: List[IliasDownloadInfo] = []
crawl_queue = asyncio.Queue() crawl_queue = asyncio.Queue()
# Setup authentication locks
self._auth_event = asyncio.Event()
self._auth_lock = asyncio.Lock()
for entry in entries: for entry in entries:
crawl_queue.put_nowait(entry) crawl_queue.put_nowait(entry)
workers = [] workers = []
# TODO: Find proper worker limit # TODO: Find proper worker limit
for _ in range(10): for _ in range(20):
worker = asyncio.create_task(self._crawl_worker(crawl_queue, result)) worker = asyncio.create_task(self._crawl_worker(crawl_queue))
workers.append(worker) workers.append(worker)
await crawl_queue.join() await crawl_queue.join()
@ -278,13 +295,22 @@ class IliasCrawler:
# Wait until all worker tasks are cancelled. # Wait until all worker tasks are cancelled.
await asyncio.gather(*workers, return_exceptions=True) await asyncio.gather(*workers, return_exceptions=True)
return result
async def _crawl_folder(self, folder_path: Path, url: str) -> List[IliasCrawlerEntry]: async def _crawl_folder(
self, folder_path: Path, url: str, course: Optional[str] = None
) -> List[IliasCrawlerEntry]:
""" """
Crawl all files in a folder-like element. Crawl all files in a folder-like element.
Raises a InvalidCourseError if the folder is a non existent course.
""" """
soup = await self._get_page(url, {}) soup = await self._get_page(url, {}, check_course_id_valid=course)
if course is not None:
link_element: bs4.Tag = soup.find(id="current_perma_link")
# It wasn't a course but a category list, forum, etc.
if not link_element or "crs_" not in link_element.get("value"):
raise InvalidCourseError(course)
if soup.find(id="headerimage"): if soup.find(id="headerimage"):
element: bs4.Tag = soup.find(id="headerimage") element: bs4.Tag = soup.find(id="headerimage")
@ -301,7 +327,9 @@ class IliasCrawler:
links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle") links: List[bs4.Tag] = soup.select("a.il_ContainerItemTitle")
for link in links: for link in links:
abs_url = self._abs_url_from_link(link) abs_url = self._abs_url_from_link(link)
element_path = Path(folder_path, _sanitize_path_name(link.getText().strip())) element_path = Path(
folder_path, _sanitize_path_name(link.getText().strip())
)
element_type = self._find_type_from_link(element_path, link, abs_url) element_type = self._find_type_from_link(element_path, link, abs_url)
if element_type == IliasElementType.REGULAR_FILE: if element_type == IliasElementType.REGULAR_FILE:
@ -312,18 +340,24 @@ class IliasCrawler:
date_portion = demangle_date(date_portion_str) date_portion = demangle_date(date_portion_str)
if not date_portion: if not date_portion:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)] result += [
IliasCrawlerEntry(element_path, abs_url, element_type, None)
]
continue continue
rest_of_name = meeting_name rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str): if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str):] rest_of_name = rest_of_name[len(date_portion_str) :]
new_name = datetime.datetime.strftime(date_portion, "%Y-%m-%d, %H:%M") \ new_name = (
datetime.datetime.strftime(date_portion, "%Y-%m-%d, %H:%M")
+ rest_of_name + rest_of_name
)
new_path = Path(folder_path, _sanitize_path_name(new_name)) new_path = Path(folder_path, _sanitize_path_name(new_name))
result += [ result += [
IliasCrawlerEntry(new_path, abs_url, IliasElementType.REGULAR_FOLDER, None) IliasCrawlerEntry(
new_path, abs_url, IliasElementType.REGULAR_FOLDER, None
)
] ]
elif element_type is not None: elif element_type is not None:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)] result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
@ -340,9 +374,7 @@ class IliasCrawler:
@staticmethod @staticmethod
def _find_type_from_link( def _find_type_from_link(
path: Path, path: Path, link_element: bs4.Tag, url: str
link_element: bs4.Tag,
url: str
) -> Optional[IliasElementType]: ) -> Optional[IliasElementType]:
""" """
Decides which sub crawler to use for a given top level element. Decides which sub crawler to use for a given top level element.
@ -370,7 +402,9 @@ class IliasCrawler:
return None return None
@staticmethod @staticmethod
def _find_type_from_folder_like(link_element: bs4.Tag, url: str) -> Optional[IliasElementType]: def _find_type_from_folder_like(
link_element: bs4.Tag, url: str
) -> Optional[IliasElementType]:
""" """
Try crawling something that looks like a folder. Try crawling something that looks like a folder.
""" """
@ -414,7 +448,9 @@ class IliasCrawler:
return IliasElementType.REGULAR_FOLDER return IliasElementType.REGULAR_FOLDER
@staticmethod @staticmethod
def _crawl_file(path: Path, link_element: bs4.Tag, url: str) -> List[IliasCrawlerEntry]: def _crawl_file(
path: Path, link_element: bs4.Tag, url: str
) -> List[IliasCrawlerEntry]:
""" """
Crawls a file. Crawls a file.
""" """
@ -425,14 +461,16 @@ class IliasCrawler:
"div", {"class": lambda x: "il_ContainerListItem" in x} "div", {"class": lambda x: "il_ContainerListItem" in x}
).select_one(".il_ItemProperties") ).select_one(".il_ItemProperties")
# The first one is always the filetype # The first one is always the filetype
file_type = properties_parent.select_one("span.il_ItemProperty").getText().strip() file_type = (
properties_parent.select_one("span.il_ItemProperty").getText().strip()
)
# The rest does not have a stable order. Grab the whole text and reg-ex the date # The rest does not have a stable order. Grab the whole text and reg-ex the date
# out of it # out of it
all_properties_text = properties_parent.getText().strip() all_properties_text = properties_parent.getText().strip()
modification_date_match = re.search( modification_date_match = re.search(
r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)", r"(((\d+\. \w+ \d+)|(Gestern|Yesterday)|(Heute|Today)|(Morgen|Tomorrow)), \d+:\d+)",
all_properties_text all_properties_text,
) )
if modification_date_match is None: if modification_date_match is None:
modification_date = None modification_date = None
@ -446,10 +484,14 @@ class IliasCrawler:
full_path = Path(path, name + "." + file_type) full_path = Path(path, name + "." + file_type)
return [ return [
IliasCrawlerEntry(full_path, url, IliasElementType.REGULAR_FILE, modification_date) IliasCrawlerEntry(
full_path, url, IliasElementType.REGULAR_FILE, modification_date
)
] ]
async def _crawl_video_directory(self, video_dir_path: Path, url: str) -> List[IliasCrawlerEntry]: async def _crawl_video_directory(
self, video_dir_path: Path, url: str
) -> List[IliasCrawlerEntry]:
""" """
Crawl the video overview site. Crawl the video overview site.
""" """
@ -462,7 +504,7 @@ class IliasCrawler:
# in a standalone html page # in a standalone html page
video_list_soup = await self._get_page( video_list_soup = await self._get_page(
self._abs_url_from_link(content_link), self._abs_url_from_link(content_link),
{"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} {"limit": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"},
) )
# If we find a page selected, we probably need to respect pagination # If we find a page selected, we probably need to respect pagination
@ -483,7 +525,7 @@ class IliasCrawler:
self, self,
video_dir_path: Path, video_dir_path: Path,
paged_video_list_soup: bs4.BeautifulSoup, paged_video_list_soup: bs4.BeautifulSoup,
second_stage_url: str second_stage_url: str,
) -> List[IliasCrawlerEntry]: ) -> List[IliasCrawlerEntry]:
LOGGER.info("Found paginated video page, trying 800 elements") LOGGER.info("Found paginated video page, trying 800 elements")
@ -498,7 +540,9 @@ class IliasCrawler:
"Could not increase elements per page (table not found)." "Could not increase elements per page (table not found)."
" Some might not be crawled!" " Some might not be crawled!"
) )
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup) return self._crawl_video_directory_second_stage(
video_dir_path, paged_video_list_soup
)
match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"]) match = re.match(r"tbl_xoct_(.+)", table_element.attrs["id"])
if match is None: if match is None:
@ -506,12 +550,18 @@ class IliasCrawler:
"Could not increase elements per page (table id not found)." "Could not increase elements per page (table id not found)."
" Some might not be crawled!" " Some might not be crawled!"
) )
return self._crawl_video_directory_second_stage(video_dir_path, paged_video_list_soup) return self._crawl_video_directory_second_stage(
video_dir_path, paged_video_list_soup
)
table_id = match.group(1) table_id = match.group(1)
extended_video_page = await self._get_page( extended_video_page = await self._get_page(
second_stage_url, second_stage_url,
{f"tbl_xoct_{table_id}_trows": 800, "cmd": "asyncGetTableGUI", "cmdMode": "asynch"} {
f"tbl_xoct_{table_id}_trows": 800,
"cmd": "asyncGetTableGUI",
"cmdMode": "asynch",
},
) )
if self._is_paginated_video_page(extended_video_page): if self._is_paginated_video_page(extended_video_page):
@ -520,12 +570,12 @@ class IliasCrawler:
" I will miss elements." " I will miss elements."
) )
return self._crawl_video_directory_second_stage(video_dir_path, extended_video_page) return self._crawl_video_directory_second_stage(
video_dir_path, extended_video_page
)
def _crawl_video_directory_second_stage( def _crawl_video_directory_second_stage(
self, self, video_dir_path: Path, video_list_soup: bs4.BeautifulSoup
video_dir_path: Path,
video_list_soup: bs4.BeautifulSoup
) -> List[IliasCrawlerEntry]: ) -> List[IliasCrawlerEntry]:
""" """
Crawls the "second stage" video page. This page contains the actual video urls. Crawls the "second stage" video page. This page contains the actual video urls.
@ -553,24 +603,27 @@ class IliasCrawler:
return results return results
def _crawl_single_video( def _crawl_single_video(
self, self, parent_path: Path, link: bs4.Tag, direct_download: bool
parent_path: Path,
link: bs4.Tag,
direct_download: bool
) -> List[IliasCrawlerEntry]: ) -> List[IliasCrawlerEntry]:
""" """
Crawl a single video based on its "Abspielen" link from the video listing. Crawl a single video based on its "Abspielen" link from the video listing.
""" """
# The link is part of a table with multiple columns, describing metadata. # The link is part of a table with multiple columns, describing metadata.
# 6th child (1 indexed) is the modification time string # 6th child (1 indexed) is the modification time string
modification_string = link.parent.parent.parent.select_one( modification_string = (
"td.std:nth-child(6)" link.parent.parent.parent.select_one("td.std:nth-child(6)")
).getText().strip() .getText()
modification_time = datetime.datetime.strptime(modification_string, "%d.%m.%Y - %H:%M") .strip()
)
modification_time = datetime.datetime.strptime(
modification_string, "%d.%m.%Y - %H:%M"
)
title = link.parent.parent.parent.select_one( title = (
"td.std:nth-child(3)" link.parent.parent.parent.select_one("td.std:nth-child(3)")
).getText().strip() .getText()
.strip()
)
title += ".mp4" title += ".mp4"
video_path: Path = Path(parent_path, _sanitize_path_name(title)) video_path: Path = Path(parent_path, _sanitize_path_name(title))
@ -580,18 +633,27 @@ class IliasCrawler:
# The video had a direct download button we can use instead # The video had a direct download button we can use instead
if direct_download: if direct_download:
LOGGER.debug("Using direct download for video %r", str(video_path)) LOGGER.debug("Using direct download for video %r", str(video_path))
return [IliasCrawlerEntry( return [
video_path, video_url, IliasElementType.VIDEO_FILE, modification_time IliasCrawlerEntry(
)] video_path,
video_url,
IliasElementType.VIDEO_FILE,
modification_time,
)
]
return [IliasCrawlerEntry( return [
IliasCrawlerEntry(
video_path, video_path,
self._crawl_video_url_from_play_link(video_url), self._crawl_video_url_from_play_link(video_url),
IliasElementType.VIDEO_FILE, IliasElementType.VIDEO_FILE,
modification_time modification_time,
)] )
]
def _crawl_video_url_from_play_link(self, play_url: str) -> Callable[[], Awaitable[Optional[str]]]: def _crawl_video_url_from_play_link(
self, play_url: str
) -> Callable[[], Awaitable[Optional[str]]]:
async def inner() -> Optional[str]: async def inner() -> Optional[str]:
# Fetch the actual video page. This is a small wrapper page initializing a javscript # Fetch the actual video page. This is a small wrapper page initializing a javscript
# player. Sadly we can not execute that JS. The actual video stream url is nowhere # player. Sadly we can not execute that JS. The actual video stream url is nowhere
@ -614,9 +676,12 @@ class IliasCrawler:
# and fetch the video url! # and fetch the video url!
video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"] video_url = json_object["streams"][0]["sources"]["mp4"][0]["src"]
return video_url return video_url
return inner return inner
async def _crawl_exercises(self, element_path: Path, url: str) -> List[IliasCrawlerEntry]: async def _crawl_exercises(
self, element_path: Path, url: str
) -> List[IliasCrawlerEntry]:
""" """
Crawl files offered for download in exercises. Crawl files offered for download in exercises.
""" """
@ -625,17 +690,21 @@ class IliasCrawler:
results: List[IliasCrawlerEntry] = [] results: List[IliasCrawlerEntry] = []
# Each assignment is in an accordion container # Each assignment is in an accordion container
assignment_containers: List[bs4.Tag] = soup.select(".il_VAccordionInnerContainer") assignment_containers: List[bs4.Tag] = soup.select(
".il_VAccordionInnerContainer"
)
for container in assignment_containers: for container in assignment_containers:
# Fetch the container name out of the header to use it in the path # Fetch the container name out of the header to use it in the path
container_name = container.select_one(".ilAssignmentHeader").getText().strip() container_name = (
container.select_one(".ilAssignmentHeader").getText().strip()
)
# Find all download links in the container (this will contain all the files) # Find all download links in the container (this will contain all the files)
files: List[bs4.Tag] = container.findAll( files: List[bs4.Tag] = container.findAll(
name="a", name="a",
# download links contain the given command class # download links contain the given command class
attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x}, attrs={"href": lambda x: x and "cmdClass=ilexsubmissiongui" in x},
text="Download" text="Download",
) )
LOGGER.debug("Found exercise container %r", container_name) LOGGER.debug("Found exercise container %r", container_name)
@ -650,30 +719,47 @@ class IliasCrawler:
LOGGER.debug("Found file %r at %r", file_name, url) LOGGER.debug("Found file %r at %r", file_name, url)
results.append(IliasCrawlerEntry( results.append(
IliasCrawlerEntry(
Path(element_path, container_name, file_name), Path(element_path, container_name, file_name),
url, url,
IliasElementType.REGULAR_FILE, IliasElementType.REGULAR_FILE,
None # We do not have any timestamp None, # We do not have any timestamp
)) )
)
return results return results
@retry_on_io_exception(3, "fetching webpage") @retry_on_io_exception(3, "fetching webpage")
async def _get_page(self, url: str, params: Dict[str, Any], async def _get_page(
retry_count: int = 0) -> bs4.BeautifulSoup: self,
url: str,
params: Dict[str, Any],
retry_count: int = 0,
check_course_id_valid: Optional[str] = None,
) -> bs4.BeautifulSoup:
""" """
Fetches a page from ILIAS, authenticating when needed. Fetches a page from ILIAS, authenticating when needed.
Raises a InvalidCourseError if the page is a non existent course.
""" """
if retry_count >= 4: if retry_count >= 4:
raise FatalException("Could not get a proper page after 4 tries. " raise FatalException(
"Could not get a proper page after 4 tries. "
"Maybe your URL is wrong, authentication fails continuously, " "Maybe your URL is wrong, authentication fails continuously, "
"your ILIAS connection is spotty or ILIAS is not well.") "your ILIAS connection is spotty or ILIAS is not well."
)
LOGGER.debug("Fetching %r", url) LOGGER.debug("Fetching %r", url)
response = await self._client.get(url, params=params) response = await self._client.get(url, params=params)
if check_course_id_valid is not None:
# We were redirected ==> Non-existant ID
if check_course_id_valid not in str(response.url):
raise InvalidCourseError(check_course_id_valid)
content_type = response.headers["content-type"] content_type = response.headers["content-type"]
if not content_type.startswith("text/html"): if not content_type.startswith("text/html"):
@ -687,11 +773,23 @@ class IliasCrawler:
if self._is_logged_in(soup): if self._is_logged_in(soup):
return soup return soup
if self._auth_lock.locked():
# Some other future is already logging in
await self._auth_event.wait()
else:
await self._auth_lock.acquire()
self._auth_event.clear()
LOGGER.info("Not authenticated, changing that...") LOGGER.info("Not authenticated, changing that...")
await self._authenticator.authenticate(self._client) await self._authenticator.authenticate(self._client)
self._auth_event.set()
self._auth_lock.release()
return await self._get_page(url, params, retry_count + 1) return await self._get_page(
url,
params,
check_course_id_valid=check_course_id_valid,
retry_count=retry_count + 1,
)
@staticmethod @staticmethod
def _is_logged_in(soup: bs4.BeautifulSoup) -> bool: def _is_logged_in(soup: bs4.BeautifulSoup) -> bool:
@ -705,7 +803,7 @@ class IliasCrawler:
video_table = soup.find( video_table = soup.find(
recursive=True, recursive=True,
name="table", name="table",
attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")} attrs={"id": lambda x: x is not None and x.startswith("tbl_xoct")},
) )
if video_table is not None: if video_table is not None:
LOGGER.debug("Auth: Found #tbl_xoct.+") LOGGER.debug("Auth: Found #tbl_xoct.+")

View file

@ -25,15 +25,19 @@ def demangle_date(date: str) -> Optional[datetime.datetime]:
saved = locale.setlocale(locale.LC_ALL) saved = locale.setlocale(locale.LC_ALL)
try: try:
try: try:
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
except locale.Error: except locale.Error:
PRETTY.warning( PRETTY.warning(
"Could not set language to german. Assuming you use english everywhere." "Could not set language to german. Assuming you use english everywhere."
) )
date = re.sub(r"\s+", " ", date) date = re.sub(r"\s+", " ", date)
date = re.sub("Gestern|Yesterday", _yesterday().strftime("%d. %b %Y"), date, re.I) date = re.sub(
date = re.sub("Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I) "Gestern|Yesterday", _yesterday().strftime("%d. %b %Y"), date, re.I
)
date = re.sub(
"Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I
)
date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I) date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I)
return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M") return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M")
except ValueError: except ValueError:

View file

@ -7,9 +7,9 @@ import os
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import Callable, Awaitable, List, Optional, Union from typing import Callable, Awaitable, List, Optional, Union
import asyncio
import bs4 import bs4
import httpx import httpx
import asyncio
from ..errors import retry_on_io_exception from ..errors import retry_on_io_exception
from ..logging import PrettyLogger from ..logging import PrettyLogger
@ -36,7 +36,7 @@ class IliasDownloadInfo(Transformable):
self, self,
path: PurePath, path: PurePath,
url: Union[str, Callable[[], Awaitable[Optional[str]]]], url: Union[str, Callable[[], Awaitable[Optional[str]]]],
modifcation_date: Optional[datetime.datetime] modifcation_date: Optional[datetime.datetime],
): ):
super().__init__(path) super().__init__(path)
if isinstance(url, str): if isinstance(url, str):
@ -87,7 +87,7 @@ class IliasDownloader:
client: httpx.Client, client: httpx.Client,
authenticator: IliasAuthenticator, authenticator: IliasAuthenticator,
strategy: IliasDownloadStrategy, strategy: IliasDownloadStrategy,
timeout: int = 5 timeout: int = 5,
): ):
""" """
Create a new IliasDownloader. Create a new IliasDownloader.
@ -133,7 +133,9 @@ class IliasDownloader:
return True return True
if not await download_impl(): if not await download_impl():
PRETTY.error(f"Download of file {info.path} failed too often! Skipping it...") PRETTY.error(
f"Download of file {info.path} failed too often! Skipping it..."
)
return return
dst_path = self._organizer.accept_file(tmp_file, info.path) dst_path = self._organizer.accept_file(tmp_file, info.path)
@ -142,8 +144,8 @@ class IliasDownloader:
dst_path, dst_path,
times=( times=(
math.ceil(info.modification_date.timestamp()), math.ceil(info.modification_date.timestamp()),
math.ceil(info.modification_date.timestamp()) math.ceil(info.modification_date.timestamp()),
) ),
) )
async def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool: async def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
@ -158,7 +160,9 @@ class IliasDownloader:
if content_type.startswith("text/html") and not has_content_disposition: if content_type.startswith("text/html") and not has_content_disposition:
if self._is_logged_in(soupify(response)): if self._is_logged_in(soupify(response)):
raise ContentTypeException("Attempting to download a web page, not a file") raise ContentTypeException(
"Attempting to download a web page, not a file"
)
return False return False

View file

@ -0,0 +1,80 @@
from typing import Callable, Awaitable, List, Optional
from .authenticators import IliasAuthenticator
from .crawler import (
IliasCrawler,
IliasDirectoryFilter,
IliasCrawlerEntry,
ResultContainer,
)
from ..utils import PathLike, to_path
from ..cookie_jar import CookieJar
class IliasSycronizer:
"""
This class is used to manage a ILIAS Crawler
"""
def __init__(
self,
base_url: str,
authenticator: IliasAuthenticator,
cookies: Optional[PathLike],
dir_filter: IliasDirectoryFilter,
):
self._cookie_jar = CookieJar(to_path(cookies) if cookies else None)
self._cookie_jar.load_cookies()
self._authenticator = authenticator
self._client = self._cookie_jar.create_async_client()
self._crawler = IliasCrawler(
base_url, self._client, self._authenticator, dir_filter
)
self._targets = []
def add_target(
self,
crawl_function: Callable[[IliasCrawler], Awaitable[List[IliasCrawlerEntry]]],
) -> ResultContainer:
"""
Adds a crawl target and returns the ResultContainer, in which DownloadInfos will be saved
Arguments:
crawl_function {Callable[[IliasCrawler], Awaitable[List[IliasCrawlerEntry]]]} -- a callback which should return an awaitable list of IliasCrawlerEntrys
"""
results = ResultContainer()
self._targets.append((crawl_function, results))
return results
def get_authenticator(self):
"""
Returns the associated authenticator
"""
return self._authenticator
def get_cookie_jar(self):
"""
Returns the associated cookie jar
"""
return self._cookie_jar
async def close_client(self):
"""
Closes the async client
"""
await self._client.aclose()
async def syncronize(self):
"""
Syncronizes all registered targets
"""
# Populate initial targets
entries = []
for (crawl_function, results) in self._targets:
entries.append((await crawl_function(self._crawler), results))
await self._crawler.iterate_entries_to_download_infos(entries)
self._cookie_jar.save_cookies()

View file

@ -31,6 +31,7 @@ class IpdDownloadInfo(Transformable):
""" """
Information about an ipd entry. Information about an ipd entry.
""" """
url: str url: str
modification_date: Optional[datetime.datetime] modification_date: Optional[datetime.datetime]
@ -83,9 +84,16 @@ class IpdCrawler:
items: List[IpdDownloadInfo] = [] items: List[IpdDownloadInfo] = []
def is_relevant_url(x: str) -> bool: def is_relevant_url(x: str) -> bool:
return x.endswith(".pdf") or x.endswith(".c") or x.endswith(".java") or x.endswith(".zip") return (
x.endswith(".pdf")
or x.endswith(".c")
or x.endswith(".java")
or x.endswith(".zip")
)
for link in page.findAll(name="a", attrs={"href": lambda x: x and is_relevant_url(x)}): for link in page.findAll(
name="a", attrs={"href": lambda x: x and is_relevant_url(x)}
):
href: str = link.attrs.get("href") href: str = link.attrs.get("href")
name = href.split("/")[-1] name = href.split("/")[-1]
@ -94,15 +102,19 @@ class IpdCrawler:
enclosing_row: bs4.Tag = link.findParent(name="tr") enclosing_row: bs4.Tag = link.findParent(name="tr")
if enclosing_row: if enclosing_row:
date_text = enclosing_row.find(name="td").text date_text = enclosing_row.find(name="td").text
modification_date = datetime.datetime.strptime(date_text, "%d.%m.%Y") modification_date = datetime.datetime.strptime(
date_text, "%d.%m.%Y"
)
except ValueError: except ValueError:
modification_date = None modification_date = None
items.append(IpdDownloadInfo( items.append(
IpdDownloadInfo(
Path(name), Path(name),
url=self._abs_url_from_link(link), url=self._abs_url_from_link(link),
modification_date=modification_date modification_date=modification_date,
)) )
)
return items return items
@ -112,7 +124,9 @@ class IpdDownloader:
A downloader for ipd files. A downloader for ipd files.
""" """
def __init__(self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy): def __init__(
self, tmp_dir: TmpDir, organizer: Organizer, strategy: IpdDownloadStrategy
):
self._tmp_dir = tmp_dir self._tmp_dir = tmp_dir
self._organizer = organizer self._organizer = organizer
self._strategy = strategy self._strategy = strategy
@ -144,11 +158,13 @@ class IpdDownloader:
dst_path, dst_path,
times=( times=(
math.ceil(info.modification_date.timestamp()), math.ceil(info.modification_date.timestamp()),
math.ceil(info.modification_date.timestamp()) math.ceil(info.modification_date.timestamp()),
) ),
) )
elif response.status_code == 403: elif response.status_code == 403:
raise FatalException("Received 403. Are you not using the KIT VPN?") raise FatalException("Received 403. Are you not using the KIT VPN?")
else: else:
PRETTY.warning(f"Could not download file, got response {response.status_code}") PRETTY.warning(
f"Could not download file, got response {response.status_code}"
)

View file

@ -7,6 +7,7 @@ from pathlib import Path, PurePath
class ResolveException(Exception): class ResolveException(Exception):
"""An exception while resolving a file.""" """An exception while resolving a file."""
# TODO take care of this when doing exception handling # TODO take care of this when doing exception handling

View file

@ -40,9 +40,9 @@ class RichLoggingHandler(logging.Handler):
def __init__(self, level: int) -> None: def __init__(self, level: int) -> None:
super().__init__(level=level) super().__init__(level=level)
self.console = Console(theme=Theme({ self.console = Console(
"logging.level.warning": Style(color="yellow") theme=Theme({"logging.level.warning": Style(color="yellow")})
})) )
self._log_render = LogRender(show_level=True, show_time=False, show_path=False) self._log_render = LogRender(show_level=True, show_time=False, show_path=False)
def emit(self, record: logging.LogRecord) -> None: def emit(self, record: logging.LogRecord) -> None:
@ -81,18 +81,14 @@ class PrettyLogger:
""" """
Print an error message indicating some operation fatally failed. Print an error message indicating some operation fatally failed.
""" """
self.logger.error( self.logger.error(f"[bold red]{message}[/bold red]")
f"[bold red]{message}[/bold red]"
)
def warning(self, message: str) -> None: def warning(self, message: str) -> None:
""" """
Print a warning message indicating some operation failed, but the error can be recovered Print a warning message indicating some operation failed, but the error can be recovered
or ignored. or ignored.
""" """
self.logger.warning( self.logger.warning(f"[bold yellow]{message}[/bold yellow]")
f"[bold yellow]{message}[/bold yellow]"
)
def modified_file(self, path: PathLike) -> None: def modified_file(self, path: PathLike) -> None:
""" """
@ -108,18 +104,14 @@ class PrettyLogger:
A new file has been downloaded. A new file has been downloaded.
""" """
self.logger.info( self.logger.info(f"[bold green]Created {self._format_path(path)}.[/bold green]")
f"[bold green]Created {self._format_path(path)}.[/bold green]"
)
def deleted_file(self, path: PathLike) -> None: def deleted_file(self, path: PathLike) -> None:
""" """
A file has been deleted. A file has been deleted.
""" """
self.logger.info( self.logger.info(f"[bold red]Deleted {self._format_path(path)}.[/bold red]")
f"[bold red]Deleted {self._format_path(path)}.[/bold red]"
)
def ignored_file(self, path: PathLike, reason: str) -> None: def ignored_file(self, path: PathLike, reason: str) -> None:
""" """
@ -127,8 +119,7 @@ class PrettyLogger:
""" """
self.logger.info( self.logger.info(
f"[dim]Ignored {self._format_path(path)} " f"[dim]Ignored {self._format_path(path)} " f"([/dim]{reason}[dim]).[/dim]"
f"([/dim]{reason}[dim]).[/dim]"
) )
def searching(self, path: PathLike) -> None: def searching(self, path: PathLike) -> None:
@ -177,8 +168,10 @@ class PrettyLogger:
subject_str = f"{subject} " if subject else "" subject_str = f"{subject} " if subject else ""
self.logger.info("") self.logger.info("")
self.logger.info(( self.logger.info(
(
f"[bold cyan]Synchronizing " f"[bold cyan]Synchronizing "
f"{subject_str}to {self._format_path(target_directory)} " f"{subject_str}to {self._format_path(target_directory)} "
f"using the {synchronizer_name} synchronizer.[/bold cyan]" f"using the {synchronizer_name} synchronizer.[/bold cyan]"
)) )
)

View file

@ -29,6 +29,7 @@ class ConflictType(Enum):
MARKED_FILE_OVERWRITTEN: A file is written for the second+ time in this run MARKED_FILE_OVERWRITTEN: A file is written for the second+ time in this run
FILE_DELETED: The file was deleted FILE_DELETED: The file was deleted
""" """
FILE_OVERWRITTEN = "overwritten" FILE_OVERWRITTEN = "overwritten"
MARKED_FILE_OVERWRITTEN = "marked_file_overwritten" MARKED_FILE_OVERWRITTEN = "marked_file_overwritten"
FILE_DELETED = "deleted" FILE_DELETED = "deleted"
@ -56,7 +57,9 @@ class FileConflictResolution(Enum):
FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution] FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution]
def resolve_prompt_user(_path: PurePath, conflict: ConflictType) -> FileConflictResolution: def resolve_prompt_user(
_path: PurePath, conflict: ConflictType
) -> FileConflictResolution:
""" """
Resolves conflicts by asking the user if a file was written twice or will be deleted. Resolves conflicts by asking the user if a file was written twice or will be deleted.
""" """
@ -72,7 +75,9 @@ class FileAcceptException(Exception):
class Organizer(Location): class Organizer(Location):
"""A helper for managing downloaded files.""" """A helper for managing downloaded files."""
def __init__(self, path: Path, conflict_resolver: FileConflictResolver = resolve_prompt_user): def __init__(
self, path: Path, conflict_resolver: FileConflictResolver = resolve_prompt_user
):
"""Create a new organizer for a given path.""" """Create a new organizer for a given path."""
super().__init__(path) super().__init__(path)
self._known_files: Set[Path] = set() self._known_files: Set[Path] = set()
@ -98,7 +103,7 @@ class Organizer(Location):
# your path... # your path...
# See: # See:
# https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation # https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation
if os.name == 'nt': if os.name == "nt":
src_absolute = Path("\\\\?\\" + str(src.resolve())) src_absolute = Path("\\\\?\\" + str(src.resolve()))
dst_absolute = Path("\\\\?\\" + str(self.resolve(dst))) dst_absolute = Path("\\\\?\\" + str(self.resolve(dst)))
else: else:
@ -116,7 +121,9 @@ class Organizer(Location):
if self._is_marked(dst): if self._is_marked(dst):
PRETTY.warning(f"File {str(dst_absolute)!r} was already written!") PRETTY.warning(f"File {str(dst_absolute)!r} was already written!")
conflict = ConflictType.MARKED_FILE_OVERWRITTEN conflict = ConflictType.MARKED_FILE_OVERWRITTEN
if self._resolve_conflict("Overwrite file?", dst_absolute, conflict, default=False): if self._resolve_conflict(
"Overwrite file?", dst_absolute, conflict, default=False
):
PRETTY.ignored_file(dst_absolute, "file was written previously") PRETTY.ignored_file(dst_absolute, "file was written previously")
return None return None
@ -201,7 +208,9 @@ class Organizer(Location):
def _delete_file_if_confirmed(self, path: Path) -> None: def _delete_file_if_confirmed(self, path: Path) -> None:
prompt = f"Do you want to delete {path}" prompt = f"Do you want to delete {path}"
if self._resolve_conflict(prompt, path, ConflictType.FILE_DELETED, default=False): if self._resolve_conflict(
prompt, path, ConflictType.FILE_DELETED, default=False
):
self.download_summary.add_deleted_file(path) self.download_summary.add_deleted_file(path)
path.unlink() path.unlink()
else: else:

View file

@ -4,20 +4,35 @@ Convenience functions for using PFERD.
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Callable, Awaitable, List, Optional, Union from typing import List, Optional, Union
import asyncio import asyncio
from .authenticators import UserPassAuthenticator from .authenticators import UserPassAuthenticator
from .cookie_jar import CookieJar from .diva import (
from .diva import (DivaDownloader, DivaDownloadStrategy, DivaPlaylistCrawler, DivaDownloader,
diva_download_new) DivaDownloadStrategy,
DivaPlaylistCrawler,
diva_download_new,
)
from .download_summary import DownloadSummary from .download_summary import DownloadSummary
from .errors import FatalException, swallow_and_print_errors from .errors import FatalException, swallow_and_print_errors
from .ilias import (IliasAuthenticator, IliasCrawler, IliasDirectoryFilter, from .ilias import (
IliasDownloader, IliasDownloadInfo, IliasDownloadStrategy, IliasDirectoryFilter,
KitShibbolethAuthenticator, download_modified_or_new) IliasDownloader,
from .ipd import (IpdCrawler, IpdDownloader, IpdDownloadInfo, IliasDownloadInfo,
IpdDownloadStrategy, ipd_download_new_or_modified) IliasDownloadStrategy,
KitShibbolethAuthenticator,
download_modified_or_new,
IliasSycronizer,
ResultContainer,
)
from .ipd import (
IpdCrawler,
IpdDownloader,
IpdDownloadInfo,
IpdDownloadStrategy,
ipd_download_new_or_modified,
)
from .location import Location from .location import Location
from .logging import PrettyLogger, enable_logging from .logging import PrettyLogger, enable_logging
from .organizer import FileConflictResolver, Organizer, resolve_prompt_user from .organizer import FileConflictResolver, Organizer, resolve_prompt_user
@ -32,6 +47,36 @@ LOGGER = logging.getLogger(__name__)
PRETTY = PrettyLogger(LOGGER) PRETTY = PrettyLogger(LOGGER)
class IliasTarget:
"""
Used to store associated options for a crawl target and hold the a reference to the results container
"""
def __init__(
self,
results: ResultContainer,
target: PathLike,
transform: Transform = lambda x: x,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
timeout: int = 5,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user,
):
self.results = results
self.target = target
self.transform = transform
self.download_strategy = download_strategy
self.clean = clean
self.timeout = timeout
self.file_conflict_resolver = file_conflict_resolver
def get_results(self) -> List[IliasDownloadInfo]:
"""
Returns the results of the associated crawl target
"""
return self.results.get_results()
class Pferd(Location): class Pferd(Location):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
""" """
@ -40,16 +85,14 @@ class Pferd(Location):
""" """
def __init__( def __init__(
self, self, base_dir: Path, tmp_dir: Path = Path(".tmp"), test_run: bool = False
base_dir: Path,
tmp_dir: Path = Path(".tmp"),
test_run: bool = False
): ):
super().__init__(Path(base_dir)) super().__init__(Path(base_dir))
self._download_summary = DownloadSummary() self._download_summary = DownloadSummary()
self._tmp_dir = TmpDir(self.resolve(tmp_dir)) self._tmp_dir = TmpDir(self.resolve(tmp_dir))
self._test_run = test_run self._test_run = test_run
self._ilias_targets: List[IliasTarget] = []
@staticmethod @staticmethod
def enable_logging() -> None: def enable_logging() -> None:
@ -73,114 +116,167 @@ class Pferd(Location):
inner_auth = UserPassAuthenticator("ILIAS - Pferd.py", username, password) inner_auth = UserPassAuthenticator("ILIAS - Pferd.py", username, password)
return KitShibbolethAuthenticator(inner_auth) return KitShibbolethAuthenticator(inner_auth)
async def _ilias(
self,
target: PathLike,
base_url: str,
crawl_function: Callable[[IliasCrawler], Awaitable[List[IliasDownloadInfo]]],
authenticator: IliasAuthenticator,
cookies: Optional[PathLike],
dir_filter: IliasDirectoryFilter,
transform: Transform,
download_strategy: IliasDownloadStrategy,
timeout: int,
clean: bool = True,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
) -> Organizer:
# pylint: disable=too-many-locals
cookie_jar = CookieJar(to_path(cookies) if cookies else None)
client = cookie_jar.create_client()
async_client = cookie_jar.create_async_client()
tmp_dir = self._tmp_dir.new_subdir()
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
crawler = IliasCrawler(base_url, async_client, authenticator, dir_filter)
downloader = IliasDownloader(tmp_dir, organizer, client,
authenticator, download_strategy, timeout)
cookie_jar.load_cookies()
info = await crawl_function(crawler)
cookie_jar.save_cookies()
transformed = apply_transform(transform, info)
if self._test_run:
self._print_transformables(transformed)
return organizer
await downloader.download_all(transformed)
cookie_jar.save_cookies()
if clean:
organizer.cleanup()
await async_client.aclose()
return organizer
@swallow_and_print_errors @swallow_and_print_errors
def ilias_kit( def ilias_kit(
self, self,
target: PathLike,
course_id: str,
dir_filter: IliasDirectoryFilter = lambda x, y: True, dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None, cookies: Optional[PathLike] = None,
username: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new, ) -> IliasSycronizer:
clean: bool = True,
timeout: int = 5,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
) -> Organizer:
""" """
Synchronizes a folder with the ILIAS instance of the KIT. Create a ILIAS Sycronizer for the ILIAS instance of the KIT.
Arguments:
target {Path} -- the target path to write the data to
course_id {str} -- the id of the main course page (found in the URL after ref_id
when opening the course homepage)
Keyword Arguments: Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped. crawler level, these directories and all of their content is skipped.
(default: {lambdax:True}) (default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from. cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None}) (default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None}) the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None}) the user. (default: {None})
"""
# This authenticator only works with the KIT ilias instance.
authenticator = Pferd._get_authenticator(username=username, password=password)
return IliasSycronizer(
"https://ilias.studium.kit.edu/", authenticator, cookies, dir_filter
)
def add_ilias_personal_desktop(
self,
ilias: IliasSycronizer,
target: PathLike,
transform: Transform = lambda x: x,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
timeout: int = 5,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user,
):
"""
Add the ILIAS "personal desktop" as a crawl target.
Arguments:
ilias {IliasSycronizer} -- the ILIAS Instance
target {Path} -- the target path to write the data to
Keyword Arguments:
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
download_strategy {DownloadStrategy} -- A function to determine which files need to download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests. be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new}) (default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes. clean {bool} -- Whether to clean up when the method finishes.
timeout {int} -- The download timeout for opencast videos. timeout {int} -- The download timeout for opencast videos. Sadly needed due to a
requests bug.
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
with overwriting or deleting files. The default always asks the user. with overwriting or deleting files. The default always asks the user.
""" """
# This authenticator only works with the KIT ilias instance. results = ilias.add_target(
authenticator = Pferd._get_authenticator(username=username, password=password) lambda crawler: crawler.crawl_personal_desktop(),
PRETTY.starting_synchronizer(target, "ILIAS", course_id) )
target = IliasTarget(
results,
target,
transform,
download_strategy,
clean,
timeout,
file_conflict_resolver,
)
self._ilias_targets.append(target)
organizer = asyncio.run(self._ilias( def add_ilias_folder(
target=target, self,
base_url="https://ilias.studium.kit.edu/", ilias: IliasSycronizer,
crawl_function=lambda crawler: crawler.crawl_course(course_id), target: PathLike,
authenticator=authenticator, course_id: str,
cookies=cookies, transform: Transform = lambda x: x,
dir_filter=dir_filter, download_strategy: IliasDownloadStrategy = download_modified_or_new,
transform=transform, clean: bool = True,
download_strategy=download_strategy, timeout: int = 5,
clean=clean, file_conflict_resolver: FileConflictResolver = resolve_prompt_user,
timeout=timeout, ):
file_conflict_resolver=file_conflict_resolver """
)) Add a course to syncronize
Arguments:
ilias {IliasSycronizer} -- the ILIAS Instance
target {Path} -- the target path to write the data to
course_id {str} -- the id of the main course page (found in the URL after ref_id
when opening the course homepage)
Keyword Arguments:
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
timeout {int} -- The download timeout for opencast videos. Sadly needed due to a
requests bug.
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
with overwriting or deleting files. The default always asks the user.
"""
results = ilias.add_target(
lambda crawler: crawler.crawl_course(course_id),
)
target = IliasTarget(
results,
target,
transform,
download_strategy,
clean,
timeout,
file_conflict_resolver,
)
self._ilias_targets.append(target)
async def _syncronize_ilias(self, ilias: IliasSycronizer):
await ilias.syncronize()
cookie_jar = ilias.get_cookie_jar()
cookie_jar.save_cookies()
authenticator = ilias.get_authenticator()
client = cookie_jar.create_client()
for entry in self._ilias_targets:
tmp_dir = self._tmp_dir.new_subdir()
organizer = Organizer(
self.resolve(to_path(entry.target)), entry.file_conflict_resolver
)
downloader = IliasDownloader(
tmp_dir,
organizer,
client,
authenticator,
entry.download_strategy,
entry.timeout,
)
transformed = apply_transform(entry.transform, entry.get_results())
if self._test_run:
self._print_transformables(transformed)
return organizer
await downloader.download_all(transformed)
if entry.clean:
organizer.cleanup()
self._download_summary.merge(organizer.download_summary) self._download_summary.merge(organizer.download_summary)
return organizer await ilias.close_client()
def syncronize_ilias(self, ilias: IliasSycronizer):
"""
Syncronize a given ilias instance
Arguments:
ilias {IliasSycronizer} -- the ILIAS Instance
"""
asyncio.run(self._syncronize_ilias(ilias))
def print_summary(self) -> None: def print_summary(self) -> None:
""" """
@ -188,136 +284,6 @@ class Pferd(Location):
""" """
PRETTY.summary(self._download_summary) PRETTY.summary(self._download_summary)
@swallow_and_print_errors
def ilias_kit_personal_desktop(
self,
target: PathLike,
dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None,
username: Optional[str] = None,
password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
timeout: int = 5,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
) -> Organizer:
"""
Synchronizes a folder with the ILIAS instance of the KIT. This method will crawl the ILIAS
"personal desktop" instead of a single course.
Arguments:
target {Path} -- the target path to write the data to
Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped.
(default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
timeout {int} -- The download timeout for opencast videos.
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
with overwriting or deleting files. The default always asks the user.
"""
# This authenticator only works with the KIT ilias instance.
authenticator = Pferd._get_authenticator(username, password)
PRETTY.starting_synchronizer(target, "ILIAS", "Personal Desktop")
organizer = asyncio.run(self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.crawl_personal_desktop(),
authenticator=authenticator,
cookies=cookies,
dir_filter=dir_filter,
transform=transform,
download_strategy=download_strategy,
clean=clean,
timeout=timeout,
file_conflict_resolver=file_conflict_resolver
))
self._download_summary.merge(organizer.download_summary)
return organizer
@swallow_and_print_errors
def ilias_kit_folder(
self,
target: PathLike,
full_url: str,
dir_filter: IliasDirectoryFilter = lambda x, y: True,
transform: Transform = lambda x: x,
cookies: Optional[PathLike] = None,
username: Optional[str] = None,
password: Optional[str] = None,
download_strategy: IliasDownloadStrategy = download_modified_or_new,
clean: bool = True,
timeout: int = 5,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user
) -> Organizer:
"""
Synchronizes a folder with a given folder on the ILIAS instance of the KIT.
Arguments:
target {Path} -- the target path to write the data to
full_url {str} -- the full url of the folder/videos/course to crawl
Keyword Arguments:
dir_filter {IliasDirectoryFilter} -- A filter for directories. Will be applied on the
crawler level, these directories and all of their content is skipped.
(default: {lambdax:True})
transform {Transform} -- A transformation function for the output paths. Return None
to ignore a file. (default: {lambdax:x})
cookies {Optional[Path]} -- The path to store and load cookies from.
(default: {None})
username {Optional[str]} -- The SCC username. If none is given, it will prompt
the user. (default: {None})
password {Optional[str]} -- The SCC password. If none is given, it will prompt
the user. (default: {None})
download_strategy {DownloadStrategy} -- A function to determine which files need to
be downloaded. Can save bandwidth and reduce the number of requests.
(default: {download_modified_or_new})
clean {bool} -- Whether to clean up when the method finishes.
timeout {int} -- The download timeout for opencast videos.
file_conflict_resolver {FileConflictResolver} -- A function specifying how to deal
with overwriting or deleting files. The default always asks the user.
"""
# This authenticator only works with the KIT ilias instance.
authenticator = Pferd._get_authenticator(username=username, password=password)
PRETTY.starting_synchronizer(target, "ILIAS", "An ILIAS element by url")
if not full_url.startswith("https://ilias.studium.kit.edu"):
raise FatalException("Not a valid KIT ILIAS URL")
organizer = asyncio.run(self._ilias(
target=target,
base_url="https://ilias.studium.kit.edu/",
crawl_function=lambda crawler: crawler.recursive_crawl_url(full_url),
authenticator=authenticator,
cookies=cookies,
dir_filter=dir_filter,
transform=transform,
download_strategy=download_strategy,
clean=clean,
timeout=timeout,
file_conflict_resolver=file_conflict_resolver
))
self._download_summary.merge(organizer.download_summary)
return organizer
@swallow_and_print_errors @swallow_and_print_errors
def ipd_kit( def ipd_kit(
self, self,
@ -326,7 +292,7 @@ class Pferd(Location):
transform: Transform = lambda x: x, transform: Transform = lambda x: x,
download_strategy: IpdDownloadStrategy = ipd_download_new_or_modified, download_strategy: IpdDownloadStrategy = ipd_download_new_or_modified,
clean: bool = True, clean: bool = True,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user file_conflict_resolver: FileConflictResolver = resolve_prompt_user,
) -> Organizer: ) -> Organizer:
""" """
Synchronizes a folder with a DIVA playlist. Synchronizes a folder with a DIVA playlist.
@ -365,7 +331,9 @@ class Pferd(Location):
self._print_transformables(transformed) self._print_transformables(transformed)
return organizer return organizer
downloader = IpdDownloader(tmp_dir=tmp_dir, organizer=organizer, strategy=download_strategy) downloader = IpdDownloader(
tmp_dir=tmp_dir, organizer=organizer, strategy=download_strategy
)
downloader.download_all(transformed) downloader.download_all(transformed)
if clean: if clean:
@ -383,7 +351,7 @@ class Pferd(Location):
transform: Transform = lambda x: x, transform: Transform = lambda x: x,
download_strategy: DivaDownloadStrategy = diva_download_new, download_strategy: DivaDownloadStrategy = diva_download_new,
clean: bool = True, clean: bool = True,
file_conflict_resolver: FileConflictResolver = resolve_prompt_user file_conflict_resolver: FileConflictResolver = resolve_prompt_user,
) -> Organizer: ) -> Organizer:
""" """
Synchronizes a folder with a DIVA playlist. Synchronizes a folder with a DIVA playlist.

View file

@ -8,9 +8,15 @@ from typing import Optional, Type
import httpx import httpx
from rich.console import Console from rich.console import Console
from rich.progress import (BarColumn, DownloadColumn, Progress, TaskID, from rich.progress import (
TextColumn, TimeRemainingColumn, BarColumn,
TransferSpeedColumn) DownloadColumn,
Progress,
TaskID,
TextColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
_progress: Progress = Progress( _progress: Progress = Progress(
TextColumn("[bold blue]{task.fields[name]}", justify="right"), TextColumn("[bold blue]{task.fields[name]}", justify="right"),
@ -23,7 +29,7 @@ _progress: Progress = Progress(
"", "",
TimeRemainingColumn(), TimeRemainingColumn(),
console=Console(file=sys.stdout), console=Console(file=sys.stdout),
transient=True transient=True,
) )
@ -47,11 +53,12 @@ class ProgressSettings:
""" """
Settings you can pass to customize the progress bar. Settings you can pass to customize the progress bar.
""" """
name: str name: str
max_size: int max_size: int
def progress_for(settings: Optional[ProgressSettings]) -> 'ProgressContextManager': def progress_for(settings: Optional[ProgressSettings]) -> "ProgressContextManager":
""" """
Returns a context manager that displays progress Returns a context manager that displays progress
@ -70,16 +77,14 @@ class ProgressContextManager:
self._settings = settings self._settings = settings
self._task_id: Optional[TaskID] = None self._task_id: Optional[TaskID] = None
def __enter__(self) -> 'ProgressContextManager': def __enter__(self) -> "ProgressContextManager":
"""Context manager entry function.""" """Context manager entry function."""
if not self._settings: if not self._settings:
return self return self
_progress.start() _progress.start()
self._task_id = _progress.add_task( self._task_id = _progress.add_task(
self._settings.name, self._settings.name, total=self._settings.max_size, name=self._settings.name
total=self._settings.max_size,
name=self._settings.name
) )
return self return self

View file

@ -25,7 +25,7 @@ class TmpDir(Location):
"""Format the folder as a string.""" """Format the folder as a string."""
return f"Folder at {self.path}" return f"Folder at {self.path}"
def __enter__(self) -> 'TmpDir': def __enter__(self) -> "TmpDir":
"""Context manager entry function.""" """Context manager entry function."""
return self return self
@ -52,7 +52,7 @@ class TmpDir(Location):
return self.resolve(Path(name)) return self.resolve(Path(name))
def new_subdir(self, prefix: Optional[str] = None) -> 'TmpDir': def new_subdir(self, prefix: Optional[str] = None) -> "TmpDir":
""" """
Create a new nested temporary folder and return it. Create a new nested temporary folder and return it.
""" """

View file

@ -45,11 +45,14 @@ def apply_transform(
result.append(transformable) result.append(transformable)
return result return result
# Transform combinators # Transform combinators
def keep(path: PurePath) -> Optional[PurePath]: def keep(path: PurePath) -> Optional[PurePath]:
return path return path
def attempt(*args: Transform) -> Transform: def attempt(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
for transform in args: for transform in args:
@ -57,11 +60,14 @@ def attempt(*args: Transform) -> Transform:
if result: if result:
return result return result
return None return None
return inner return inner
def optionally(transform: Transform) -> Transform: def optionally(transform: Transform) -> Transform:
return attempt(transform, lambda path: path) return attempt(transform, lambda path: path)
def do(*args: Transform) -> Transform: def do(*args: Transform) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
current = path current = path
@ -72,43 +78,56 @@ def do(*args: Transform) -> Transform:
else: else:
return None return None
return current return current
return inner return inner
def predicate(pred: Callable[[PurePath], bool]) -> Transform: def predicate(pred: Callable[[PurePath], bool]) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
if pred(path): if pred(path):
return path return path
return None return None
return inner return inner
def glob(pattern: str) -> Transform: def glob(pattern: str) -> Transform:
return predicate(lambda path: path.match(pattern)) return predicate(lambda path: path.match(pattern))
def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform: def move_dir(source_dir: PathLike, target_dir: PathLike) -> Transform:
source_path = to_path(source_dir) source_path = to_path(source_dir)
target_path = to_path(target_dir) target_path = to_path(target_dir)
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
if source_path in path.parents: if source_path in path.parents:
return target_path / path.relative_to(source_path) return target_path / path.relative_to(source_path)
return None return None
return inner return inner
def move(source: PathLike, target: PathLike) -> Transform: def move(source: PathLike, target: PathLike) -> Transform:
source_path = to_path(source) source_path = to_path(source)
target_path = to_path(target) target_path = to_path(target)
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
if path == source_path: if path == source_path:
return target_path return target_path
return None return None
return inner return inner
def rename(source: str, target: str) -> Transform: def rename(source: str, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
if path.name == source: if path.name == source:
return path.with_name(target) return path.with_name(target)
return None return None
return inner return inner
def re_move(regex: Regex, target: str) -> Transform: def re_move(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(str(path)) match = to_pattern(regex).fullmatch(str(path))
@ -117,8 +136,10 @@ def re_move(regex: Regex, target: str) -> Transform:
groups.extend(match.groups()) groups.extend(match.groups())
return PurePath(target.format(*groups)) return PurePath(target.format(*groups))
return None return None
return inner return inner
def re_rename(regex: Regex, target: str) -> Transform: def re_rename(regex: Regex, target: str) -> Transform:
def inner(path: PurePath) -> Optional[PurePath]: def inner(path: PurePath) -> Optional[PurePath]:
match = to_pattern(regex).fullmatch(path.name) match = to_pattern(regex).fullmatch(path.name)
@ -127,6 +148,7 @@ def re_rename(regex: Regex, target: str) -> Transform:
groups.extend(match.groups()) groups.extend(match.groups())
return path.with_name(target.format(*groups)) return path.with_name(target.format(*groups))
return None return None
return inner return inner
@ -136,7 +158,7 @@ def sanitize_windows_path(path: PurePath) -> PurePath:
This method is a no-op on other operating systems. This method is a no-op on other operating systems.
""" """
# Escape windows illegal path characters # Escape windows illegal path characters
if os.name == 'nt': if os.name == "nt":
sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x) for x in list(path.parts)] sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x) for x in list(path.parts)]
return PurePath(*sanitized_parts) return PurePath(*sanitized_parts)
return path return path

View file

@ -62,7 +62,7 @@ def stream_to_path(
else: else:
settings = None settings = None
with open(target, 'wb') as file_descriptor: with open(target, "wb") as file_descriptor:
with progress_for(settings) as progress: with progress_for(settings) as progress:
for chunk in response.iter_bytes(): for chunk in response.iter_bytes():
file_descriptor.write(chunk) file_descriptor.write(chunk)