From 12be2f9c45567edc014110feb615290cb0a93ff4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Marten=20Br=C3=BCggemann?= Date: Sat, 14 Sep 2024 02:39:18 +0200 Subject: [PATCH] refactoring --- sync_kiwix.py | 213 ++++++++++++++++++++++++++------------------------ 1 file changed, 111 insertions(+), 102 deletions(-) diff --git a/sync_kiwix.py b/sync_kiwix.py index fea1705..cb5ad14 100644 --- a/sync_kiwix.py +++ b/sync_kiwix.py @@ -17,6 +17,14 @@ KIWIX_BASE_URL = "https://download.kiwix.org/zim/" class CacheException(Exception): pass +class NoMatchException(Exception): + def __init__(self, message: str, wiki: str): + super().__init__(message) + self.wiki = wiki + +class DownloadException(Exception): + pass + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="sync_kiwix", @@ -26,102 +34,82 @@ def parse_args() -> argparse.Namespace: parser.add_argument('destination', type=str, help="Destination directory") parser.add_argument('--cache', action=argparse.BooleanOptionalAction, default=True, help="Use filelist cache (default: yes)") parser.add_argument('--cache-file', type=str, default=os.path.join(os.getcwd(), ".cache"), help="Path to cache file (default: ./.cache)") + parser.add_argument('--cache-max-age', type=int, default=1, help="Maximum age of the cached file list in days") return parser.parse_args() -def load_cached_filelist(cache_file: str) -> list: +def load_cache(cache_file: str) -> list: try: with open(file=cache_file, mode='r') as file: return json.load(file) except json.decoder.JSONDecodeError: raise CacheException('Could not decode JSON') -def save_cached_filelist(cache_file: str, filelist: list): +def save_cache(cache_file: str, zim_list: list): with open(file=cache_file, mode='w') as file: - json.dump(filelist, file) + json.dump(zim_list, file) -def retreive_filelist(session: requests.Session, url: str) -> list: - response = session.get(url) - response.raise_for_status() +def retreive_zim_list(session: requests.Session, url: str) -> list: + try: + response = session.get(url) + response.raise_for_status() + except requests.exceptions.HTTPError as err: + raise DownloadException(str(err)) directories = re.findall(r'', response.text) files = re.findall(r'', response.text) result = list() - for directory in directories: - result += retreive_filelist(session=session, url=urljoin(url, directory)) + result += retreive_zim_list(session=session, url=urljoin(url, directory)) for file in files: - result.append([url, file]) + + file_base, file_extension = os.path.splitext(file) + file_base_without_date, file_date = file_base.rsplit('_', 1) + + result.append({ + "url": urljoin(url, file), + "url_base": url, + "file": file, + "file_base": file_base, + "file_date": file_date, + "file_extension": file_extension, + "file_base_without_date": file_base_without_date + }) return result -def get_download_candidates(filelist: list, wiki: str) -> dict: - candidates = list() +def print_fuzzy_matches(wiki: str, zim_list: list): + print(f"Here is a list of zim files similar to your request:", file=sys.stderr) - for file_item in filelist: - path = file_item[0] - file = file_item[1] - - file_name, file_extension = os.path.splitext(file) - file_base, file_date = file_name.rsplit('_', 1) - - if wiki != file_base: - continue - - candidates.append(( - path, - file, - file_name, - file_extension, - file_base, - file_date - )) - - return candidates - -def error_no_candidate(filelist: list, wiki: str): - print(f"Could not find any match to {wiki}.\n", file=sys.stderr) - print(f"Here is a list of urls similar to your request:", file=sys.stderr) - - for file_item in filelist: - url = urljoin(file_item[0], file_item[1]) - if wiki in url: - print(f" - {url}", file=sys.stderr) + for zim in zim_list: + if wiki in zim["url"]: + print(f" - {zim['file_base_without_date']} ({zim['url']})", file=sys.stderr) - raise SystemExit("Aborting.") - -def get_download_files(wikis: list, filelist: list) -> list: - download_files = list() +def generate_download_list(wikis: list, zim_list: list) -> list: + result = list() for wiki in wikis: - candidates = get_download_candidates(filelist=filelist, wiki=wiki) - if not candidates: - error_no_candidate(wiki=wiki, filelist=filelist) + current_zim = None + for zim in zim_list: + if wiki == zim["file_base_without_date"]: + if current_zim is None: + current_zim = zim + continue + + current_zim_date = datetime.datetime.strptime(current_zim["file_date"], "%Y-%m") + zim_date = datetime.datetime.strptime(zim["file_date"], "%Y-%m") + if zim_date > current_zim_date: + current_zim = zim - # Get most current candidate - candidate0 = None - for candidate in candidates: - if candidate0 is None: - candidate0 = candidate - continue - - candidate0_date = datetime.datetime.strptime(candidate0[5], "%Y-%m") - candidate_date = datetime.datetime.strptime(candidate[5], "%Y-%m") + if current_zim is None: + raise NoMatchException(f"Could not find any matches for {wiki}", wiki=wiki) + + result.append(current_zim) - if candidate_date > candidate0_date: - candidate0 = candidate - - download_files.append(( - wiki, - candidate0[3], - urljoin(candidate0[0], candidate0[1]), - candidate0[1] - )) - - return download_files + return result def check_is_newer(path: str, last_modified: str) -> bool: try: @@ -132,43 +120,52 @@ def check_is_newer(path: str, last_modified: str) -> bool: mtime_remote = datetime.datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z") return mtime_remote.date() > mtime_local.date() -def download_wiki(session: requests.Session, title: str, src: str, dst: str): +def download_zim(session: requests.Session, zim: dict, dst: str): + + # Get sha256 digest from server + try: + response = session.head(url=zim["url"]) + response.raise_for_status() + except requests.exceptions.HTTPError as err: + raise DownloadException(str(err)) - # Get digest - response = session.head(url=src) digests = response.headers.get("Digest").split(",") - digest_value = None + for digest in digests: method, data = digest.strip().split("=", 1) if method == "SHA-256": digest_value = base64.b64decode(data) break else: - raise SystemExit(f"Could not get SHA-256 digest for {title}. Aborting.") + raise DownloadException(f"Could not get SHA-256 digest for {zim['file']}. Aborting.") sha256 = hashlib.sha256() - with session.get(url=src, stream=True) as response: - response.raise_for_status() + try: + with session.get(url=zim["url"], stream=True) as response: + response.raise_for_status() - if not check_is_newer(dst, response.headers.get("Last-Modified")): - print(f"{title} was not updated. Skipping...") - return + if not check_is_newer(dst, response.headers.get("Last-Modified")): + print(f"{zim['file_base_without_date']} was not updated on server. Skipping...") + return - total_size = int(response.headers.get("content-length", 0)) - block_size = 1024 - with tqdm(total=total_size, unit="B", unit_scale=True, desc=title) as progress_bar: - with open(dst + '.part', "wb") as file: - for block in response.iter_content(block_size): - progress_bar.update(len(block)) - file.write(block) - sha256.update(block) + total_size = int(response.headers.get("content-length", 0)) + block_size = 1024 + with tqdm(total=total_size, unit="B", unit_scale=True, desc=zim['file_base_without_date']) as progress_bar: + with open(dst + '.part', "wb") as file: + for block in response.iter_content(block_size): + progress_bar.update(len(block)) + file.write(block) + sha256.update(block) + + except requests.exceptions.HTTPError as err: + raise DownloadException(str(err)) if digest_value != sha256.digest(): - raise SystemExit(f"Checksum Error for {title}. Aborting.") + raise DownloadException(f"Checksum Error for {zim['file_base_without_date']}. Aborting.") - sha256sum = sha256.hexdigest() + sha256_hex = sha256.hexdigest() with open(dst + '.sha256sum', 'w') as file: - file.write(f"{sha256sum} {dst}\n") + file.write(f"{sha256_hex} {dst}\n") os.rename(dst + '.part', dst) @@ -178,28 +175,40 @@ def main(): # Create Session session = requests.Session() - # Get Filelist - filelist = None - + # Read ZIM list from cache if enabled + zim_list = None if args.cache == True: try: mtime = datetime.datetime.fromtimestamp(os.path.getmtime(args.cache_file)) - if datetime.datetime.now() - mtime < datetime.timedelta(days=1): - filelist = load_cached_filelist(cache_file=args.cache_file) + if datetime.datetime.now() - mtime < datetime.timedelta(days=args.cache_max_age): + zim_list = load_cache(cache_file=args.cache_file) except (FileNotFoundError, CacheException): pass - if filelist is None: - filelist = retreive_filelist(session=session, url=KIWIX_BASE_URL) - save_cached_filelist(cache_file=args.cache_file, filelist=filelist) + # Retreive new list, if no cached file was loaded + if zim_list is None: + try: + zim_list = retreive_zim_list(session=session, url=KIWIX_BASE_URL) + except DownloadException as err: + raise SystemExit(str(err)) + + save_cache(cache_file=args.cache_file, zim_list=zim_list) - # Get download files list - download_files = get_download_files(wikis=args.wiki, filelist=filelist) + # Get download list + try: + download_list = generate_download_list(wikis=args.wiki, zim_list=zim_list) + except NoMatchException as err: + print(f"{str(err)}\n", file=sys.stderr) + print_fuzzy_matches(wiki=err.wiki, zim_list=zim_list) + raise SystemExit("\nAborting.") - # Download files - for download_file in download_files: - file_path = os.path.join(args.destination, download_file[0] + download_file[1]) - download_wiki(session=session, title=download_file[3], src=download_file[2], dst=file_path) + # Download zim files + try: + for zim in download_list: + dst_path = os.path.join(args.destination, zim["file_base_without_date"] + zim["file_extension"]) + download_zim(session=session, zim=zim, dst=dst_path) + except DownloadException as err: + raise SystemExit(str(err)) if __name__ == "__main__": main() \ No newline at end of file