#!/usr/bin/env python3 import sys import os import datetime import argparse import requests import re import json import hashlib import base64 from urllib.parse import urljoin from tqdm import tqdm KIWIX_BASE_URL = "https://download.kiwix.org/zim/" class CacheException(Exception): pass class MultipleFileException(Exception): pass def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="sync_kiwix", description="Synchronize zim files for kiwix", ) parser.add_argument('wiki', type=str, nargs="+", help="Wikis to synchronize") parser.add_argument('destination', type=str, help="Destination directory") parser.add_argument('--cache', action=argparse.BooleanOptionalAction, default=True, help="Use filelist cache (default: yes)") parser.add_argument('--cache-file', type=str, default=os.path.join(os.getcwd(), ".cache"), help="Path to cache file (default: ./.cache)") return parser.parse_args() def load_cached_filelist(cache_file: str) -> list: try: with open(file=cache_file, mode='r') as file: return json.load(file) except json.decoder.JSONDecodeError: raise CacheException('Could not decode JSON') def save_cached_filelist(cache_file: str, filelist: list): with open(file=cache_file, mode='w') as file: json.dump(filelist, file) def retreive_filelist(session: requests.Session, url: str, cache_file: str = "", cache_max_age: datetime.timedelta = datetime.timedelta(days=1)) -> list: if cache_file != "": try: mtime = datetime.datetime.fromtimestamp(os.path.getmtime(cache_file)) if datetime.datetime.now() - mtime < cache_max_age: return load_cached_filelist(cache_file=cache_file) except (FileNotFoundError, CacheException): pass response = session.get(url) response.raise_for_status() directories = re.findall(r'', response.text) files = re.findall(r'', response.text) result = list() for directory in directories: result += retreive_filelist(session=session, url=urljoin(url, directory)) for file in files: result.append([url, file]) if cache_file != "": save_cached_filelist(cache_file=cache_file, filelist=result) return result def find_wiki_files(filelist: list, wiki: str) -> list: result = list() for file in filelist: if file[1].startswith(wiki): result.append(file) return result def error_multiple_files(wiki: str, wiki_files: str): print(f"{wiki} has multiple matches. Please specify your input more precisely.\n", file=sys.stderr) print(f"{wiki} matched to:", file=sys.stderr) for file in wiki_files: url = urljoin(file[0], file[1]) print(f" - {url}", file=sys.stderr) raise SystemExit("Aborting.") def get_download_candidates(wiki_files: list, wiki: str) -> dict: candidates = list() # Check if results are unique path0 = None file_base0 = None for wiki_file in wiki_files: path = wiki_file[0] file = wiki_file[1] if path0 is None: path0 = path if path0 != path: raise MultipleFileException(wiki) file_name, file_extension = os.path.splitext(file) file_base, file_date = file_name.rsplit('_', 1) if file_base0 is None: file_base0 = file_base if file_base0 != file_base: raise MultipleFileException(wiki) candidates.append(( path, file, file_name, file_extension, file_base, file_date )) return candidates def get_wiki_files(wikis: list, filelist: list) -> list: download_files = list() for wiki in wikis: wiki_files = find_wiki_files(filelist=filelist, wiki=wiki) try: candidates = get_download_candidates(wiki_files=wiki_files, wiki=wiki) except MultipleFileException: error_multiple_files(wiki=wiki, wiki_files=wiki_files) # Get most current candidate candidate0 = None for candidate in candidates: if candidate0 is None: candidate0 = candidate continue candidate0_date = datetime.datetime.strptime(candidate0[5], "%Y-%m") candidate_date = datetime.datetime.strptime(candidate[5], "%Y-%m") if candidate_date > candidate0_date: candidate0 = candidate download_files.append(( wiki, candidate0[3], urljoin(candidate0[0], candidate0[1]), candidate0[1] )) return download_files def check_is_newer(path: str, last_modified: str) -> bool: try: mtime_local = datetime.datetime.fromtimestamp(os.path.getmtime(path)) except FileNotFoundError: return True mtime_remote = datetime.datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z") return mtime_remote.date() > mtime_local.date() def download_wiki(session: requests.Session, title: str, src: str, dst: str): # Get digest response = session.head(url=src) digests = response.headers.get("Digest").split(",") digest_value = None for digest in digests: method, data = digest.strip().split("=", 1) if method == "SHA-256": digest_value = base64.b64decode(data) break sha256 = hashlib.sha256() with session.get(url=src, stream=True) as response: response.raise_for_status() if not check_is_newer(dst, response.headers.get("Last-Modified")): print(f"{title} was not updated. Skipping...") return total_size = int(response.headers.get("content-length", 0)) block_size = 1024 with tqdm(total=total_size, unit="B", unit_scale=True, desc=title) as progress_bar: with open(dst + '.part', "wb") as file: for block in response.iter_content(block_size): progress_bar.update(len(block)) file.write(block) sha256.update(block) if digest_value != sha256.digest(): raise SystemExit(f"Checksum Error for {title}. Aborting.") sha256sum = sha256.hexdigest() with open(dst + '.sha256sum', 'w') as file: file.write(f"{sha256sum} {dst}\n") os.rename(dst + '.part', dst) def main(): args = parse_args() # Create Session session = requests.Session() # Get Filelist filelist = retreive_filelist(session=session, url=KIWIX_BASE_URL, cache_file=args.cache_file if args.cache else "") # Get downlaod files list wiki_files = get_wiki_files(wikis=args.wiki, filelist=filelist) # Download files for wiki_file in wiki_files: file_path = os.path.join(args.destination, wiki_file[0] + wiki_file[1]) download_wiki(session=session, title=wiki_file[3], src=wiki_file[2], dst=file_path) if __name__ == "__main__": main()