run autopep8 and fix formatting

This commit is contained in:
be7a 2021-04-28 23:09:38 +02:00
parent 1d3e169ead
commit 2d6be9f5c1
No known key found for this signature in database
GPG key ID: 6510870A77F49A99
13 changed files with 79 additions and 40 deletions

View file

@ -83,7 +83,8 @@ class DivaPlaylistCrawler:
body = response.json()
if body["error"]:
raise FatalException(f"DIVA: Server returned error {body['error']!r}.")
raise FatalException(
f"DIVA: Server returned error {body['error']!r}.")
return body["result"]["collection"]["id"]
@ -91,9 +92,11 @@ class DivaPlaylistCrawler:
"""
Crawls the playlist given in the constructor.
"""
response = httpx.get(self._COLLECTION_BASE_URL, params={"collection": self._id})
response = httpx.get(self._COLLECTION_BASE_URL,
params={"collection": self._id})
if response.status_code != 200:
raise FatalException(f"Server returned status {response.status_code}.")
raise FatalException(
f"Server returned status {response.status_code}.")
body = response.json()
@ -109,7 +112,8 @@ class DivaPlaylistCrawler:
for video in result["resultList"]:
title = video["title"]
collection_title = self._follow_path(["collection", "title"], video)
collection_title = self._follow_path(
["collection", "title"], video)
url = self._follow_path(
["resourceList", "derivateList", "mp4", "url"], video
)

View file

@ -53,7 +53,8 @@ def retry_on_io_exception(max_retries: int, message: str) -> Callable[[TFun], TF
try:
return function(*args, **kwargs)
except IOError as error:
PRETTY.warning(f"Error duing operation '{message}': {error}")
PRETTY.warning(
f"Error duing operation '{message}': {error}")
PRETTY.warning(
f"Retrying operation '{message}'. Remaining retries: {max_retries - 1 - i}"
)

View file

@ -71,7 +71,8 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
while not self._login_successful(soup):
# Searching the form here so that this fails before asking for
# credentials rather than after asking.
form = soup.find("form", {"class": "full content", "method": "post"})
form = soup.find(
"form", {"class": "full content", "method": "post"})
action = form["action"]
csrf_token = form.find("input", {"name": "csrf_token"})["value"]
@ -119,7 +120,8 @@ class KitShibbolethAuthenticator(IliasAuthenticator):
# https://idp.scc.kit.edu/idp/profile/SAML2/Redirect/SSO
LOGGER.debug("Attempt to log in to Shibboleth with TFA token")
url = "https://idp.scc.kit.edu" + action
data = {"_eventId_proceed": "", "j_tokenNumber": self._tfa_auth.get_token()}
data = {"_eventId_proceed": "",
"j_tokenNumber": self._tfa_auth.get_token()}
return soupify(await client.post(url, data=data))
@staticmethod

View file

@ -315,7 +315,8 @@ class IliasCrawler:
if soup.find(id="headerimage"):
element: bs4.Tag = soup.find(id="headerimage")
if "opencast" in element.attrs["src"].lower():
PRETTY.warning(f"Switched to crawling a video at {folder_path}")
PRETTY.warning(
f"Switched to crawling a video at {folder_path}")
if not self.dir_filter(folder_path, IliasElementType.VIDEO_FOLDER):
PRETTY.not_searching(folder_path, "user filter")
return []
@ -330,7 +331,8 @@ class IliasCrawler:
element_path = Path(
folder_path, _sanitize_path_name(link.getText().strip())
)
element_type = self._find_type_from_link(element_path, link, abs_url)
element_type = self._find_type_from_link(
element_path, link, abs_url)
if element_type == IliasElementType.REGULAR_FILE:
result += self._crawl_file(folder_path, link, abs_url)
@ -341,13 +343,14 @@ class IliasCrawler:
if not date_portion:
result += [
IliasCrawlerEntry(element_path, abs_url, element_type, None)
IliasCrawlerEntry(
element_path, abs_url, element_type, None)
]
continue
rest_of_name = meeting_name
if rest_of_name.startswith(date_portion_str):
rest_of_name = rest_of_name[len(date_portion_str) :]
rest_of_name = rest_of_name[len(date_portion_str):]
new_name = (
datetime.datetime.strftime(date_portion, "%Y-%m-%d, %H:%M")
@ -360,9 +363,11 @@ class IliasCrawler:
)
]
elif element_type is not None:
result += [IliasCrawlerEntry(element_path, abs_url, element_type, None)]
result += [IliasCrawlerEntry(element_path,
abs_url, element_type, None)]
else:
PRETTY.warning(f"Found element without a type at {str(element_path)!r}")
PRETTY.warning(
f"Found element without a type at {str(element_path)!r}")
return result
@ -424,7 +429,8 @@ class IliasCrawler:
return None
# Find the small descriptive icon to figure out the type
img_tag: Optional[bs4.Tag] = found_parent.select_one("img.ilListItemIcon")
img_tag: Optional[bs4.Tag] = found_parent.select_one(
"img.ilListItemIcon")
if img_tag is None:
PRETTY.warning(f"Could not find image tag for {url!r}")
@ -462,7 +468,8 @@ class IliasCrawler:
).select_one(".il_ItemProperties")
# The first one is always the filetype
file_type = (
properties_parent.select_one("span.il_ItemProperty").getText().strip()
properties_parent.select_one(
"span.il_ItemProperty").getText().strip()
)
# The rest does not have a stable order. Grab the whole text and reg-ex the date
@ -474,7 +481,8 @@ class IliasCrawler:
)
if modification_date_match is None:
modification_date = None
PRETTY.warning(f"Could not extract start date from {all_properties_text!r}")
PRETTY.warning(
f"Could not extract start date from {all_properties_text!r}")
else:
modification_date_str = modification_date_match.group(1)
modification_date = demangle_date(modification_date_str)
@ -598,7 +606,8 @@ class IliasCrawler:
results += self._crawl_single_video(video_dir_path, link, True)
else:
for link in video_links:
results += self._crawl_single_video(video_dir_path, link, False)
results += self._crawl_single_video(
video_dir_path, link, False)
return results
@ -667,7 +676,8 @@ class IliasCrawler:
json_match = regex.search(str(video_page_soup))
if json_match is None:
PRETTY.warning(f"Could not find json stream info for {play_url!r}")
PRETTY.warning(
f"Could not find json stream info for {play_url!r}")
return None
json_str = json_match.group(1)
@ -713,7 +723,8 @@ class IliasCrawler:
for file_link in files:
# Two divs, side by side. Left is the name, right is the link ==> get left
# sibling
file_name = file_link.parent.findPrevious(name="div").getText().strip()
file_name = file_link.parent.findPrevious(
name="div").getText().strip()
file_name = _sanitize_path_name(file_name)
url = self._abs_url_from_link(file_link)

View file

@ -38,7 +38,8 @@ def demangle_date(date: str) -> Optional[datetime.datetime]:
date = re.sub(
"Heute|Today", datetime.date.today().strftime("%d. %b %Y"), date, re.I
)
date = re.sub("Morgen|Tomorrow", _tomorrow().strftime("%d. %b %Y"), date, re.I)
date = re.sub("Morgen|Tomorrow", _tomorrow().strftime(
"%d. %b %Y"), date, re.I)
return datetime.datetime.strptime(date, "%d. %b %Y, %H:%M")
except ValueError:
PRETTY.warning(f"Could not parse date {date!r}")

View file

@ -72,7 +72,8 @@ def download_modified_or_new(organizer: Organizer, info: IliasDownloadInfo) -> b
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
PRETTY.ignored_file(
info.path, "local file has newer or equal modification time")
return False
@ -126,7 +127,8 @@ class IliasDownloader:
@retry_on_io_exception(3, "downloading file")
async def download_impl() -> bool:
if not await self._try_download(info, tmp_file):
LOGGER.info("Re-Authenticating due to download failure: %r", info)
LOGGER.info(
"Re-Authenticating due to download failure: %r", info)
self._authenticator.authenticate(self._client)
raise IOError("Scheduled retry")
else:
@ -151,7 +153,8 @@ class IliasDownloader:
async def _try_download(self, info: IliasDownloadInfo, target: Path) -> bool:
url = await info.url()
if url is None:
PRETTY.warning(f"Could not download {str(info.path)!r} as I got no URL :/")
PRETTY.warning(
f"Could not download {str(info.path)!r} as I got no URL :/")
return True
with self._client.stream("GET", url, timeout=self._timeout) as response:

View file

@ -47,7 +47,8 @@ def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) ->
if not resolved_file.exists():
return True
if not info.modification_date:
PRETTY.ignored_file(info.path, "could not find modification time, file exists")
PRETTY.ignored_file(
info.path, "could not find modification time, file exists")
return False
resolved_mod_time_seconds = resolved_file.stat().st_mtime
@ -56,7 +57,8 @@ def ipd_download_new_or_modified(organizer: Organizer, info: IpdDownloadInfo) ->
if info.modification_date.timestamp() > resolved_mod_time_seconds:
return True
PRETTY.ignored_file(info.path, "local file has newer or equal modification time")
PRETTY.ignored_file(
info.path, "local file has newer or equal modification time")
return False
@ -163,7 +165,8 @@ class IpdDownloader:
)
elif response.status_code == 403:
raise FatalException("Received 403. Are you not using the KIT VPN?")
raise FatalException(
"Received 403. Are you not using the KIT VPN?")
else:
PRETTY.warning(
f"Could not download file, got response {response.status_code}"

View file

@ -37,6 +37,7 @@ class Location:
# TODO Make this less inefficient
if self.path not in absolute_path.parents:
raise ResolveException(f"Path {target} is not inside directory {self.path}")
raise ResolveException(
f"Path {target} is not inside directory {self.path}")
return absolute_path

View file

@ -43,7 +43,8 @@ class RichLoggingHandler(logging.Handler):
self.console = Console(
theme=Theme({"logging.level.warning": Style(color="yellow")})
)
self._log_render = LogRender(show_level=True, show_time=False, show_path=False)
self._log_render = LogRender(
show_level=True, show_time=False, show_path=False)
def emit(self, record: logging.LogRecord) -> None:
"""
@ -104,14 +105,16 @@ class PrettyLogger:
A new file has been downloaded.
"""
self.logger.info(f"[bold green]Created {self._format_path(path)}.[/bold green]")
self.logger.info(
f"[bold green]Created {self._format_path(path)}.[/bold green]")
def deleted_file(self, path: PathLike) -> None:
"""
A file has been deleted.
"""
self.logger.info(f"[bold red]Deleted {self._format_path(path)}.[/bold red]")
self.logger.info(
f"[bold red]Deleted {self._format_path(path)}.[/bold red]")
def ignored_file(self, path: PathLike, reason: str) -> None:
"""

View file

@ -54,7 +54,8 @@ class FileConflictResolution(Enum):
PROMPT = "prompt"
FileConflictResolver = Callable[[PurePath, ConflictType], FileConflictResolution]
FileConflictResolver = Callable[[
PurePath, ConflictType], FileConflictResolution]
def resolve_prompt_user(
@ -124,7 +125,8 @@ class Organizer(Location):
if self._resolve_conflict(
"Overwrite file?", dst_absolute, conflict, default=False
):
PRETTY.ignored_file(dst_absolute, "file was written previously")
PRETTY.ignored_file(
dst_absolute, "file was written previously")
return None
# Destination file is directory

View file

@ -113,7 +113,8 @@ class Pferd(Location):
def _get_authenticator(
username: Optional[str], password: Optional[str]
) -> KitShibbolethAuthenticator:
inner_auth = UserPassAuthenticator("ILIAS - Pferd.py", username, password)
inner_auth = UserPassAuthenticator(
"ILIAS - Pferd.py", username, password)
return KitShibbolethAuthenticator(inner_auth)
@swallow_and_print_errors
@ -140,7 +141,8 @@ class Pferd(Location):
"""
# This authenticator only works with the KIT ilias instance.
authenticator = Pferd._get_authenticator(username=username, password=password)
authenticator = Pferd._get_authenticator(
username=username, password=password)
return IliasSycronizer(
"https://ilias.studium.kit.edu/", authenticator, cookies, dir_filter
)
@ -243,7 +245,8 @@ class Pferd(Location):
for entry in self._ilias_targets:
tmp_dir = self._tmp_dir.new_subdir()
organizer = Organizer(
self.resolve(to_path(entry.target)), entry.file_conflict_resolver
self.resolve(to_path(entry.target)
), entry.file_conflict_resolver
)
downloader = IliasDownloader(
@ -320,7 +323,8 @@ class Pferd(Location):
if isinstance(target, Organizer):
organizer = target
else:
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
organizer = Organizer(self.resolve(
to_path(target)), file_conflict_resolver)
PRETTY.starting_synchronizer(organizer.path, "IPD", url)
@ -374,7 +378,8 @@ class Pferd(Location):
tmp_dir = self._tmp_dir.new_subdir()
if playlist_location.startswith("http"):
playlist_id = DivaPlaylistCrawler.fetch_id(playlist_link=playlist_location)
playlist_id = DivaPlaylistCrawler.fetch_id(
playlist_link=playlist_location)
else:
playlist_id = playlist_location
@ -385,7 +390,8 @@ class Pferd(Location):
if isinstance(target, Organizer):
organizer = target
else:
organizer = Organizer(self.resolve(to_path(target)), file_conflict_resolver)
organizer = Organizer(self.resolve(
to_path(target)), file_conflict_resolver)
PRETTY.starting_synchronizer(organizer.path, "DIVA", playlist_id)

View file

@ -159,6 +159,7 @@ def sanitize_windows_path(path: PurePath) -> PurePath:
"""
# Escape windows illegal path characters
if os.name == "nt":
sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x) for x in list(path.parts)]
sanitized_parts = [re.sub(r'[<>:"/|?]', "_", x)
for x in list(path.parts)]
return PurePath(*sanitized_parts)
return path

View file

@ -58,7 +58,8 @@ def stream_to_path(
length = size_from_headers(response)
if progress_name and length and int(length) > 1024 * 1024 * 10: # 10 MiB
settings: Optional[ProgressSettings] = ProgressSettings(progress_name, length)
settings: Optional[ProgressSettings] = ProgressSettings(
progress_name, length)
else:
settings = None