diff --git a/sync_kiwix.py b/sync_kiwix.py index cb5ad14..06ea62b 100644 --- a/sync_kiwix.py +++ b/sync_kiwix.py @@ -14,41 +14,63 @@ from tqdm import tqdm KIWIX_BASE_URL = "https://download.kiwix.org/zim/" + class CacheException(Exception): pass + class NoMatchException(Exception): def __init__(self, message: str, wiki: str): super().__init__(message) self.wiki = wiki + class DownloadException(Exception): pass - + + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="sync_kiwix", description="Synchronize zim files for kiwix", ) - parser.add_argument('wiki', type=str, nargs="+", help="Wikis to synchronize") - parser.add_argument('destination', type=str, help="Destination directory") - parser.add_argument('--cache', action=argparse.BooleanOptionalAction, default=True, help="Use filelist cache (default: yes)") - parser.add_argument('--cache-file', type=str, default=os.path.join(os.getcwd(), ".cache"), help="Path to cache file (default: ./.cache)") - parser.add_argument('--cache-max-age', type=int, default=1, help="Maximum age of the cached file list in days") + parser.add_argument("wiki", type=str, nargs="+", help="Wikis to synchronize") + parser.add_argument("destination", type=str, help="Destination directory") + parser.add_argument( + "--cache", + action=argparse.BooleanOptionalAction, + default=True, + help="Use filelist cache (default: yes)", + ) + parser.add_argument( + "--cache-file", + type=str, + default=os.path.join(os.getcwd(), ".cache"), + help="Path to cache file (default: ./.cache)", + ) + parser.add_argument( + "--cache-max-age", + type=int, + default=1, + help="Maximum age of the cached file list in days", + ) return parser.parse_args() + def load_cache(cache_file: str) -> list: try: - with open(file=cache_file, mode='r') as file: + with open(file=cache_file, mode="r") as file: return json.load(file) except json.decoder.JSONDecodeError: - raise CacheException('Could not decode JSON') + raise CacheException("Could not decode JSON") + def save_cache(cache_file: str, zim_list: list): - with open(file=cache_file, mode='w') as file: + with open(file=cache_file, mode="w") as file: json.dump(zim_list, file) + def retreive_zim_list(session: requests.Session, url: str) -> list: try: response = session.get(url) @@ -66,27 +88,31 @@ def retreive_zim_list(session: requests.Session, url: str) -> list: for file in files: file_base, file_extension = os.path.splitext(file) - file_base_without_date, file_date = file_base.rsplit('_', 1) + file_base_without_date, file_date = file_base.rsplit("_", 1) - result.append({ - "url": urljoin(url, file), - "url_base": url, - "file": file, - "file_base": file_base, - "file_date": file_date, - "file_extension": file_extension, - "file_base_without_date": file_base_without_date - }) + result.append( + { + "url": urljoin(url, file), + "url_base": url, + "file": file, + "file_base": file_base, + "file_date": file_date, + "file_extension": file_extension, + "file_base_without_date": file_base_without_date, + } + ) return result + def print_fuzzy_matches(wiki: str, zim_list: list): print(f"Here is a list of zim files similar to your request:", file=sys.stderr) for zim in zim_list: if wiki in zim["url"]: - print(f" - {zim['file_base_without_date']} ({zim['url']})", file=sys.stderr) - + print(f" - {zim['file_base_without_date']} ({zim['url']})", file=sys.stderr) + + def generate_download_list(wikis: list, zim_list: list) -> list: result = list() @@ -99,18 +125,21 @@ def generate_download_list(wikis: list, zim_list: list) -> list: current_zim = zim continue - current_zim_date = datetime.datetime.strptime(current_zim["file_date"], "%Y-%m") + current_zim_date = datetime.datetime.strptime( + current_zim["file_date"], "%Y-%m" + ) zim_date = datetime.datetime.strptime(zim["file_date"], "%Y-%m") if zim_date > current_zim_date: current_zim = zim - + if current_zim is None: raise NoMatchException(f"Could not find any matches for {wiki}", wiki=wiki) - + result.append(current_zim) return result + def check_is_newer(path: str, last_modified: str) -> bool: try: mtime_local = datetime.datetime.fromtimestamp(os.path.getmtime(path)) @@ -120,6 +149,7 @@ def check_is_newer(path: str, last_modified: str) -> bool: mtime_remote = datetime.datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z") return mtime_remote.date() > mtime_local.date() + def download_zim(session: requests.Session, zim: dict, dst: str): # Get sha256 digest from server @@ -137,21 +167,30 @@ def download_zim(session: requests.Session, zim: dict, dst: str): digest_value = base64.b64decode(data) break else: - raise DownloadException(f"Could not get SHA-256 digest for {zim['file']}. Aborting.") - + raise DownloadException( + f"Could not get SHA-256 digest for {zim['file']}. Aborting." + ) + sha256 = hashlib.sha256() try: with session.get(url=zim["url"], stream=True) as response: response.raise_for_status() if not check_is_newer(dst, response.headers.get("Last-Modified")): - print(f"{zim['file_base_without_date']} was not updated on server. Skipping...") + print( + f"{zim['file_base_without_date']} was not updated on server. Skipping..." + ) return total_size = int(response.headers.get("content-length", 0)) block_size = 1024 - with tqdm(total=total_size, unit="B", unit_scale=True, desc=zim['file_base_without_date']) as progress_bar: - with open(dst + '.part', "wb") as file: + with tqdm( + total=total_size, + unit="B", + unit_scale=True, + desc=zim["file_base_without_date"], + ) as progress_bar: + with open(dst + ".part", "wb") as file: for block in response.iter_content(block_size): progress_bar.update(len(block)) file.write(block) @@ -161,13 +200,16 @@ def download_zim(session: requests.Session, zim: dict, dst: str): raise DownloadException(str(err)) if digest_value != sha256.digest(): - raise DownloadException(f"Checksum Error for {zim['file_base_without_date']}. Aborting.") + raise DownloadException( + f"Checksum Error for {zim['file_base_without_date']}. Aborting." + ) sha256_hex = sha256.hexdigest() - with open(dst + '.sha256sum', 'w') as file: + with open(dst + ".sha256sum", "w") as file: file.write(f"{sha256_hex} {dst}\n") - os.rename(dst + '.part', dst) + os.rename(dst + ".part", dst) + def main(): args = parse_args() @@ -180,7 +222,9 @@ def main(): if args.cache == True: try: mtime = datetime.datetime.fromtimestamp(os.path.getmtime(args.cache_file)) - if datetime.datetime.now() - mtime < datetime.timedelta(days=args.cache_max_age): + if datetime.datetime.now() - mtime < datetime.timedelta( + days=args.cache_max_age + ): zim_list = load_cache(cache_file=args.cache_file) except (FileNotFoundError, CacheException): pass @@ -191,7 +235,7 @@ def main(): zim_list = retreive_zim_list(session=session, url=KIWIX_BASE_URL) except DownloadException as err: raise SystemExit(str(err)) - + save_cache(cache_file=args.cache_file, zim_list=zim_list) # Get download list @@ -205,10 +249,13 @@ def main(): # Download zim files try: for zim in download_list: - dst_path = os.path.join(args.destination, zim["file_base_without_date"] + zim["file_extension"]) + dst_path = os.path.join( + args.destination, zim["file_base_without_date"] + zim["file_extension"] + ) download_zim(session=session, zim=zim, dst=dst_path) except DownloadException as err: raise SystemExit(str(err)) - + + if __name__ == "__main__": - main() \ No newline at end of file + main()