Mercurial > codedump
view kemonopartydownloader.py @ 115:f10492e8720b
kemonopartydownloader.py: add youtube downloader stubs and dump json to disk
author | Paper <mrpapersonic@gmail.com> |
---|---|
date | Mon, 23 Jan 2023 23:58:22 -0500 |
parents | b14e2a096ebf |
children |
line wrap: on
line source
""" Usage: kemonopartydownloader.py <url>... (--cookies <filename>) [--output <folder>] [--proxy <proxy>] [--timeout <seconds>] kemonopartydownloader.py -h | --help Arguments: <url> Kemono.party URL to download from -c --cookies <filename> A Netscape-compatible cookies.txt file Options: -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] -p --proxy <proxy> HTTP or HTTPS proxy (SOCKS5 with PySocks) -t --timeout <seconds> Time between downloads [default: 1] """ import docopt import http.cookiejar import os import re import requests # pip install requests import time import math import zipfile import urllib.parse import yt_dlp from yt_dlp.utils import sanitize_filename as sanitize from urllib.error import HTTPError from http.client import BadStatusLine def download_folder_from_google_drive(link: str) -> int: takeout_domain = "https://takeout-pa.clients6.google.com" drive_id = link.split("?")[0].split("/")[-1] ses = requests.Session() ses.headers = { 'origin': 'https://drive.google.com', 'content-type': 'application/json', } key = "AIzaSyC1qbk75NzWBvSaDh6KnsjjA9pIrP4lYIE" # google anonymous key takeoutjs = ses.post(takeout_domain + "/v1/exports?key=%s" % (key), data='{"items":[{"id":"%s"}]}' % (drive_id)).json() takeoutid = str(takeoutjs["exportJob"]["id"]) storagePath = None while storagePath is None: succeededjson = ses.get(takeout_domain + "/v1/exports/%s?key=%s" % (takeoutid, key)).json()["exportJob"] if succeededjson["status"] == "SUCCEEDED": storagePath = str(succeededjson["archives"][0] ["storagePath"]) time.sleep(1) size = 0 for path, dirs, files in os.walk("./%s/Drive - %s" % (output, sanitize(i["title"]))): for f in files: fp = os.path.join(path, f) size += os.path.getsize(fp) try: if size >= int(succeededjson["archives"][0]["sizeOfContents"]): print(" %s already downloaded!" % (succeededjson["archives"][0] ["fileName"])) return 1 except Exception as e: print(" %s download failed! %s" % (succeededjson["archives"][0] ["fileName"], str(e))) response = ses.get(storagePath, stream=True) amountdone = 0 with open(succeededjson["archives"][0]["fileName"], "wb") as f: for chunk in response.iter_content(4096): if chunk: # filter out keep-alive new chunks f.write(chunk) amountdone += 4096 print(" downloading %s: %.2f%%" % (succeededjson["archives"][0]["fileName"], (amountdone / int(succeededjson["archives"][0] ["compressedSize"])) * 100), end="\r") print(" downloaded %s: 100.00%% " % (succeededjson["archives"][0]["fileName"])) unzip(succeededjson["archives"][0]["fileName"], "./%s/Drive - %s" % (output, sanitize(i["title"]))) os.remove(succeededjson["archives"][0]["fileName"]) return 0 def unzip(src_path: str, dst_dir: str, pwd: str = None) -> None: with zipfile.ZipFile(src_path) as zf: members = zf.namelist() for member in members: arch_info = zf.getinfo(member) arch_name = arch_info.filename.replace('/', os.path.sep) dst_path = os.path.join(dst_dir, arch_name) dst_path = os.path.normpath(dst_path) if not os.path.exists(dst_path): zf.extract(arch_info, dst_dir, pwd) def download_from_dropbox(link: str) -> None: responsehead = req.head(link.split("?")[0] + "?dl=1", allow_redirects=True) if responsehead.status_code == 404: print(" dropbox link not available!") return filename = urllib.parse.unquote( responsehead.headers["Content-Disposition"].split("'")[-1]) if not os.path.exists(output + "/Dropbox - " + sanitize(i["title"])): os.makedirs(output + "/Dropbox - " + sanitize(i["title"])) filepath = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]), sanitize(filename)) if os.path.exists(filepath): print(" file(s) already downloaded!") return filesize = os.stat(filepath).st_size if os.path.exists(filepath) else 0 # will always be 0 if it's a folder... if filesize == 0: with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r: r.raise_for_status() with open(filepath, "ab") as f: for chunk in r.iter_content(chunk_size=4096): f.write(chunk) filesize += 4096 print(" file %s downloading..." % (filename), end="\r") print(" file %s successfully downloaded!" % (filename)) if filename.endswith(".zip"): unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) os.remove(filepath) def download_from_youtube(link: str) -> int: # int is the response return 0 # just a stub for now # https://stackoverflow.com/a/39225039 def download_file_from_google_drive(drive_id: str, out: str = "") -> None: def get_confirm_token(response: requests.Response): for key, value in response.cookies.items(): if key.startswith('download_warning'): return value return None def save_response_content(response: requests.Response): amountdone = 0 CHUNK_SIZE = 4096 filename = sanitize( response.headers["Content-Disposition"].split("'")[-1]) folderpath = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out) if not os.path.exists(): os.makedirs(folderpath) destination = "%s/%s" % (folderpath, filename) filesize = (os.stat(destination).st_size if os.path.exists(destination) else 0) serverfilesize = int(response.headers["Content-Range"].split('/')[0]) if (os.path.exists(destination) and filesize == serverfilesize): print(" %s already downloaded!" % os.path.basename(destination)) return with open(destination, "wb") as f: for chunk in response.iter_content(CHUNK_SIZE): if chunk: # filter out keep-alive new chunks f.write(chunk) amountdone += CHUNK_SIZE print(" downloading %s: %.2f%%" % (os.path.basename(destination), (amountdone / serverfilesize)), end="\r") print(" downloaded %s: %.2f%% " % (os.path.basename(destination), 100.0)) URL = "https://docs.google.com/uc?export=download" session = requests.Session() headers = { "Range": "bytes=0-", } session.proxies = req.proxies response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True) while response.status_code == 403: time.sleep(30) response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True) if response.status_code == 404: return # bypass when root folder has no files token = get_confirm_token(response) if token: params = {'id': drive_id, 'confirm': token} response = session.get(URL, headers=headers, params=params, stream=True) save_response_content(response) def find_urls(s: str) -> list: url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + """%[0-9a-fA-F][0-9a-fA-F]))+""") urllist = [] for findall in re.findall(url_regex, s): urllist.append(findall.split("<")[0].split(">")[-1]) return urllist def download_file(i: dict, x: dict, count: int) -> None: filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count, sanitize(i["title"]), x["name"]) amountdone = 0 filesize = 0 if os.path.exists(filename): filesize = os.path.getsize(filename) serverhead = req.head("https://kemono.party/data" + x['path'], allow_redirects=True) for i in range(500): serverfilesize = int(serverhead.headers["Content-Length"]) if filesize < serverfilesize: amountdone += filesize with req.get("https://kemono.party/data" + x['path'], stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r: r.raise_for_status() with open(filename, "ab") as f: for chunk in r.iter_content(chunk_size=4096): f.write(chunk) amountdone += len(chunk) print(" downloading image %d: %.2f%%" % (count, (amountdone / serverfilesize) * 100), end="\r") print(" downloaded image " + str(count) + ": 100.00% ") return else: print(" image " + str(count) + " already downloaded!") return time.sleep(10) print(" download timed out!") return def parse_json(i: dict, count: int) -> None: unique_urls = [] for url in find_urls(i["content"]): # spaghetti parsed_url = urllib.parse.urlparse(url) if parsed_url.netloc == "drive.google.com": if parsed_url.path.startswith("/drive/folders"): if url not in unique_urls: download_folder_from_google_drive(url) unique_urls.append(url) elif (parsed_url.path == "/open" and parsed_url.query.startswith == "id"): if url not in unique_urls: download_file_from_google_drive( parsed_url.query.split("=") [-1]) unique_urls.append(url) elif parsed_url.path.startswith("/file/"): if url not in unique_urls: download_file_from_google_drive(parsed_url.path.split("/") [-2]) unique_urls.append(url) elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: if url not in unique_urls: download_from_dropbox(url) unique_urls.append(url) elif parsed_url.netloc in ["youtube.com", "youtu.be", "www.youtube.com"]: if url not in unique_urls: download_from_youtube(url) unique_urls.append(url) for x in i["attachments"]: count += 1 while not os.path.exists("%s/%s_%dp_%s_%s" % (output, i["id"], count, sanitize(i["title"]), x["name"])): try: download_file(i, x, count) except (HTTPError, BadStatusLine): while 1: time.sleep(10) download_file(i, x, count) except Exception as e: print(e) time.sleep(timeout) def get_amount_of_posts(s: str, u: str): amount = 0 while 1: data = req.get("https://kemono.party/api/%s/user/%s?o=%d" % (s, u, amount)).json() if len(data) < 25: return math.ceil((amount + 1) / 25) amount += 25 args = docopt.docopt(__doc__) req = requests.Session() if args["--proxy"]: req.proxies = { "http": args["--proxy"], "https": args["--proxy"], } timeout = int(args["--timeout"]) cj = http.cookiejar.MozillaCookieJar(args["--cookies"]) cj.load(ignore_expires=True) req.cookies = cj for url in args["<url>"]: if url.isnumeric(): print("do not input user IDs in --url! use a link instead") continue if url.split("/")[-2] == "post": service = url.split("/")[-5] user = url.split("/")[-3] post = url.split("/")[-1] pages = 1 elif url.split("/")[-2] == "user": service = url.split("/")[-3] user = url.split("/")[-1] pages = get_amount_of_posts(service, user) output = "%s/%s-%s" % (args["--output"], service, user) if not os.path.exists(output): os.mkdir(output) for page in range(pages): try: post userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s" % (service, user, post)).json() except Exception: userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s" % (service, user, (page * 25))).json() for i in userdata: print(i["id"]) count = 0 parse_json(i, count) filename = "%s/%s_%dp_%s.info.json" % (output, i["id"], count, sanitize(i["title"])) json.dump(i, filename)