Mercurial > codedump
changeset 98:e4bf37150a3f
kemonopartydownloader.py: lint,
use docopt instead of argparse, maybe some other changes
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Sun, 14 Aug 2022 05:29:01 -0400 |
parents | f1f4f6da04bd |
children | 2bccbf473ff4 |
files | kemonopartydownloader.py |
diffstat | 1 files changed, 154 insertions(+), 145 deletions(-) [+] |
line wrap: on
line diff
--- a/kemonopartydownloader.py Sat Aug 13 20:27:58 2022 -0400 +++ b/kemonopartydownloader.py Sun Aug 14 05:29:01 2022 -0400 @@ -1,4 +1,20 @@ -import argparse +""" +Usage: + kemonopartydownloader.py <url>... (--cookies <filename>) + [--output <folder>] + [--proxy <proxy>] + kemonopartydownloader.py -h | --help + +Arguments: + <url> Kemono.party URL to download from + +Options: + -h --help Show this screen + -c --cookies <filename> A Netscape-compatible cookies.txt file + -o --output <folder> Output folder, relative to the current directory + -p --proxy <proxy> HTTP or HTTPS proxy (SOCKS5 with PySocks) +""" +import docopt import http.cookiejar import os import re @@ -7,52 +23,61 @@ import math import zipfile import urllib.parse -import sys from urllib.error import HTTPError from http.client import BadStatusLine -def under_num(maximum: int, num: int) -> int: - return num if num <= maximum else maximum - def download_folder_from_google_drive(link: str) -> int: - session = requests.Session() - session.headers = { + takeout_domain = "https://takeout-pa.clients6.google.com" + drive_id = link.split("?")[0].split("/")[-1] + ses = requests.Session() + ses.headers = { 'origin': 'https://drive.google.com', 'content-type': 'application/json', } key = "AIzaSyC1qbk75NzWBvSaDh6KnsjjA9pIrP4lYIE" # google anonymous key - takeoutjs = session.post("https://takeout-pa.clients6.google.com/v1/exports?key=%s" % (key), data='{"items":[{"id":"%s"}]}' % (link.split("?")[0].split("/")[-1])).json() + takeoutjs = ses.post(takeout_domain + "/v1/exports?key=%s" % (key), + data='{"items":[{"id":"%s"}]}' % (drive_id)).json() takeoutid = str(takeoutjs["exportJob"]["id"]) storagePath = None while storagePath is None: - succeededjson = session.get("https://takeout-pa.clients6.google.com/v1/exports/%s?key=%s" % (takeoutid, key)).json() - if succeededjson["exportJob"]["status"] == "SUCCEEDED": - storagePath = str(succeededjson["exportJob"]["archives"][0]["storagePath"]) + succeededjson = ses.get(takeout_domain + "/v1/exports/%s?key=%s" + % (takeoutid, key)).json()["exportJob"] + if succeededjson["status"] == "SUCCEEDED": + storagePath = str(succeededjson["archives"][0] + ["storagePath"]) time.sleep(1) size = 0 - for path, dirs, files in os.walk("./%s/Drive - %s" % (output, sanitize(i["title"]))): + for path, dirs, files in os.walk("./%s/Drive - %s" + % (output, sanitize(i["title"]))): for f in files: fp = os.path.join(path, f) size += os.path.getsize(fp) try: - if size >= int(succeededjson["exportJob"]["archives"][0]["sizeOfContents"]): - print(" %s already downloaded!" % (succeededjson["exportJob"]["archives"][0]["fileName"])) + if size >= int(succeededjson["archives"][0]["sizeOfContents"]): + print(" %s already downloaded!" % (succeededjson["archives"][0] + ["fileName"])) return 1 except Exception as e: - print(" %s download failed! %s" % (succeededjson["exportJob"]["archives"][0]["fileName"], str(e))) - print(e) - response = session.get(storagePath, stream=True) + print(" %s download failed! %s" % (succeededjson["archives"][0] + ["fileName"], str(e))) + response = ses.get(storagePath, stream=True) amountdone = 0 - with open(succeededjson["exportJob"]["archives"][0]["fileName"], "wb") as f: - for chunk in response.iter_content(1024): + with open(succeededjson["archives"][0]["fileName"], "wb") as f: + for chunk in response.iter_content(4096): if chunk: # filter out keep-alive new chunks f.write(chunk) - amountdone += 1024 - print(" downloading %s: %.2f%%" % (succeededjson["exportJob"]["archives"][0]["fileName"], (amountdone / int(succeededjson['exportJob']['archives'][0]['compressedSize'])) * 100), end="\r") - print(" downloaded %s: 100.00%% " % (succeededjson["exportJob"]["archives"][0]["fileName"])) - unzip(succeededjson["exportJob"]["archives"][0]["fileName"], "./%s/Drive - %s" % (output, sanitize(i["title"]))) - os.remove(succeededjson["exportJob"]["archives"][0]["fileName"]) + amountdone += 4096 + print(" downloading %s: %.2f%%" + % (succeededjson["archives"][0]["fileName"], + (amountdone / int(succeededjson["archives"][0] + ["compressedSize"])) * 100), end="\r") + print(" downloaded %s: 100.00%% " + % (succeededjson["archives"][0]["fileName"])) + unzip(succeededjson["archives"][0]["fileName"], "./%s/Drive - %s" + % (output, + sanitize(i["title"]))) + os.remove(succeededjson["archives"][0]["fileName"]) return 0 @@ -73,29 +98,34 @@ if responsehead.status_code == 404: print(" dropbox link not available!") return + filename = urllib.parse.unquote( + responsehead.headers["Content-Disposition"].split("'")[-1]) if not os.path.exists(output + "/Dropbox - " + sanitize(i["title"])): os.makedirs(output + "/Dropbox - " + sanitize(i["title"])) - filename = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]), sanitize(responsehead.headers["Content-Disposition"].split("'")[-1])) - if os.path.exists(urllib.parse.unquote(os.path.splitext(filename)[0])) and os.path.isdir(urllib.parse.unquote(os.path.splitext(filename)[0])): + filepath = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]), + sanitize(filename)) + if os.path.exists(filepath): print(" file(s) already downloaded!") return - filesize = os.stat(filename).st_size if os.path.exists(filename) else 0 + filesize = os.stat(filepath).st_size if os.path.exists(filepath) else 0 # will always be 0 if it's a folder... if filesize == 0: - with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r: + with req.get(link.split("?")[0] + "?dl=1", stream=True, + headers={"Range": "bytes=%d-" % (filesize)}) as r: r.raise_for_status() - with open(filename, "ab") as f: + with open(filepath, "ab") as f: for chunk in r.iter_content(chunk_size=4096): f.write(chunk) filesize += 4096 - print(" file %s downloading..." % (urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])), end="\r") - print(" file %s successfully downloaded!" % (urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1]))) - if responsehead.headers["Content-Disposition"].split("'")[-1].endswith(".zip"): - unzip(filename, urllib.parse.unquote(os.path.splitext(filename)[0])) - os.remove(filename) + print(" file %s downloading..." % (filename), end="\r") + print(" file %s successfully downloaded!" % (filename)) + if filename.endswith(".zip"): + unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) + os.remove(filepath) -def download_file_from_google_drive(drive_id: str, out: str = "") -> None: # https://stackoverflow.com/questions/25010369/wget-curl-large-file-from-google-drive/39225039 +# https://stackoverflow.com/a/39225039 +def download_file_from_google_drive(drive_id: str, out: str = "") -> None: def get_confirm_token(response: requests.Response): for key, value in response.cookies.items(): if key.startswith('download_warning'): @@ -106,14 +136,19 @@ def save_response_content(response: requests.Response): amountdone = 0 CHUNK_SIZE = 4096 - filename = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out) + filename = sanitize( + response.headers["Content-Disposition"].split("'")[-1]) + folderpath = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out) if not os.path.exists(): - os.makedirs(filename) - destination = filename + "/" + sanitize(response.headers["Content-Disposition"].split("'")[-1]) - filesize = os.stat(destination).st_size if os.path.exists(destination) else 0 + os.makedirs(folderpath) + destination = "%s/%s" % (folderpath, filename) + filesize = (os.stat(destination).st_size + if os.path.exists(destination) + else 0) + serverfilesize = int(response.headers["Content-Range"].split('/')[0]) - if os.path.exists(destination) and filesize == int(response.headers["Content-Range"].partition('/')[-1]): - print(" " + os.path.basename(destination) + " already downloaded!") + if (os.path.exists(destination) and filesize == serverfilesize): + print(" %s already downloaded!" % os.path.basename(destination)) return with open(destination, "wb") as f: @@ -121,8 +156,11 @@ if chunk: # filter out keep-alive new chunks f.write(chunk) amountdone += CHUNK_SIZE - print(" downloading %s: %.2f%%" % (os.path.basename(destination), (amountdone / int(response.headers["Content-Range"].partition('/')[-1]))), end="\r") - print(" downloaded %s: %.2f%% " % (os.path.basename(destination), 100.0)) + print(" downloading %s: %.2f%%" + % (os.path.basename(destination), + (amountdone / serverfilesize)), end="\r") + print(" downloaded %s: %.2f%% " + % (os.path.basename(destination), 100.0)) URL = "https://docs.google.com/uc?export=download" @@ -134,11 +172,13 @@ session.proxies = req.proxies - response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True) + response = session.get(URL, headers=headers, params={'id': drive_id}, + stream=True) while response.status_code == 403: time.sleep(30) - response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True) + response = session.get(URL, headers=headers, params={'id': drive_id}, + stream=True) if response.status_code == 404: return # bypass when root folder has no files @@ -147,7 +187,8 @@ if token: params = {'id': drive_id, 'confirm': token} - response = session.get(URL, headers=headers, params=params, stream=True) + response = session.get(URL, headers=headers, params=params, + stream=True) save_response_content(response) @@ -157,27 +198,35 @@ def find_urls(s: str) -> list: + url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + + """%[0-9a-fA-F][0-9a-fA-F]))+""") urllist = [] - for findall in re.findall(r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+""", s): + for findall in re.findall(url_regex, s): urllist.append(findall.split("<")[0].split(">")[-1]) return urllist def download_file(i: dict, x: dict, count: int) -> None: - filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count, sanitize(i["title"]), x["name"]) + filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count, + sanitize(i["title"]), x["name"]) amountdone = 0 filesize = os.stat(filename).st_size if os.path.exists(filename) else 0 - serverhead = req.head("https://kemono.party/data" + x['path'], allow_redirects=True) + serverhead = req.head("https://kemono.party/data" + x['path'], + allow_redirects=True) for i in range(500): serverfilesize = int(serverhead.headers["Content-Length"]) if filesize < serverfilesize: - with req.get("https://kemono.party/data" + x['path'], stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r: + amountdone += filesize + with req.get("https://kemono.party/data" + x['path'], stream=True, + headers={"Range": "bytes=%d-" % (filesize)}) as r: r.raise_for_status() with open(filename, "ab") as f: for chunk in r.iter_content(chunk_size=4096): f.write(chunk) amountdone += len(chunk) - print(" downloading image %d: %.2f%%" % (count, under_num(100, round(((filesize + amountdone) / serverfilesize) * 100, 2))), end="\r") + print(" downloading image %d: %.2f%%" + % (count, (amountdone / serverfilesize) * 100), + end="\r") print(" downloaded image " + str(count) + ": 100.00% ") return else: @@ -189,42 +238,35 @@ def parse_json(i: dict, count: int) -> None: - seen_gdrive_ids = set() unique_urls = [] for url in find_urls(i["content"]): parsed_url = urllib.parse.urlparse(url) if parsed_url.netloc == "drive.google.com": if parsed_url.path.startswith("drive/folders"): - if parsed_url.path.split("/")[-1] not in seen_gdrive_ids: + if url not in unique_urls: + download_folder_from_google_drive(url) unique_urls.append(url) - seen_gdrive_ids.add(parsed_url.path.split("/")[-1]) - elif parsed_url.path == "open" and parsed_url.query.startswith == "id": - if parsed_url.query.split("=")[-1] not in seen_gdrive_ids: - unique_urls.append(req.head(url).headers["Location"], allow_redirects=True) - seen_gdrive_ids.add(parsed_url.query.split("=")[-1]) + elif (parsed_url.path == "open" and + parsed_url.query.startswith == "id"): + if url not in unique_urls: + download_file_from_google_drive( + parsed_url.query.split("=") + [-1]) + unique_urls.append(url) elif parsed_url.path.startswith("file/"): - if parsed_url.path.split("/")[-2] not in seen_gdrive_ids: + if url not in unique_urls: + download_file_from_google_drive(parsed_url.path.split("/") + [-2]) unique_urls.append(url) - seen_gdrive_ids.add(url.split("?")[0].split("/")[-2]) elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: if url not in unique_urls: + download_from_dropbox(url) unique_urls.append(url) - else: - pass - for url in unique_urls: - if url.startswith("https://drive.google.com/drive/folders/"): - # Google Drive folder downloading - print(" Google Drive link found! attempting to download its files...") - download_folder_from_google_drive(url) - elif url.startswith("https://drive.google.com/file/"): - print(" Google Drive link found! attempting to download its files...") - download_file_from_google_drive(url.split("?")[0].split("/")[-2]) - elif url.startswith("https://www.dropbox.com/"): - print(" Dropbox link found! attempting to download its files...") - download_from_dropbox(url) for x in i["attachments"]: count += 1 - while not os.path.exists("%s/%d_%dp_%s_%s" % (output, int(i["id"]) - 1, count, sanitize(i["title"]), x["name"])): + while not os.path.exists("%s/%s_%dp_%s_%s" + % (output, i["id"], count, + sanitize(i["title"]), x["name"])): try: download_file(i, x, count) break @@ -240,89 +282,56 @@ def get_amount_of_posts(s: str, u: str): amount = 0 while 1: - data = req.get("https://kemono.party/api/%s/user/%s?o=%d" % (s, u, amount)).json() + data = req.get("https://kemono.party/api/%s/user/%s?o=%d" + % (s, u, amount)).json() if len(data) < 25: return math.ceil((amount + 1) / 25) amount += 25 -parser = argparse.ArgumentParser(description="Downloads files from kemono.party") -parser.add_argument("-u", "--url", help="user URL", metavar='<url>', required=True) -parser.add_argument("-c", "--cookies", help="", metavar='<cookies>', required=True) # required because of DDoS-GUARD -parser.add_argument("-p", "--proxy", help="proxy\n supported types: http, https, socks5 (requires pysocks)", metavar='<proxy>') # SOCKS proxy support is through PySocks - pip install pysocks -parser.add_argument("-o", "--output", help="output folder, defaults to user ID", metavar='<output>') -parser.add_argument("--test-download-services", dest="testdownloadservices", action="store_true", help="test download services") -args = parser.parse_args() +args = docopt.docopt(__doc__) req = requests.Session() -if args.testdownloadservices: - output = "download_services_test" - i = {"title": "DEEZ NUTS"} - download_folder_from_google_drive("https://drive.google.com/drive/folders/1rZN2ejZnGdF0EpaZuknlDp26a0qSjsEI") - download_from_dropbox("https://www.dropbox.com/s/yg405bpznyobo3u/test.txt?dl=0") # File - download_from_dropbox("https://www.dropbox.com/sh/ne3c7bxtkt5tg4s/AABYPNGfHoil4HO_btudw0wPa?dl=0") # Folder - exit() +if args["--proxy"]: + req.proxies = { + "http": args["--proxy"], + "https": args["--proxy"], + } -if args.proxy: - req.proxies = {} - if args.proxy[:6] == "socks5": - httpproxy = args.proxy - httpsproxy = args.proxy - elif args.proxy[:5] == "https": - httpsproxy = args.proxy - elif args.proxy[:4] == "http": - httpproxy = args.proxy - else: - print("unknown proxy format! defaulting to HTTP...") - httpproxy = args.proxy - if httpproxy: - req.proxies["http"] = httpproxy - if httpsproxy: - req.proxies["https"] = httpsproxy - -cj = http.cookiejar.MozillaCookieJar(args.cookies) +cj = http.cookiejar.MozillaCookieJar(args["--cookies"]) cj.load(ignore_expires=True) req.cookies = cj -try: - int(args.url) - print("do not input user IDs in --url! use a link instead") - exit() -except Exception: - pass +for url in args["<url>"]: + if url.isnumeric(): + print("do not input user IDs in --url! use a link instead") + continue -# TODO: use urlparse here... -if args.url.split("/")[-2] == "post": - service = args.url.split("/")[-5] - user = args.url.split("/")[-3] - post = args.url.split("/")[-1] -elif args.url.split("/")[-2] == "user": - service = args.url.split("/")[-3] - user = args.url.split("/")[-1] - -if not args.output: - output = "%s-%s" % (service, user) -else: - output = args.output + if url.split("/")[-2] == "post": + service = url.split("/")[-5] + user = url.split("/")[-3] + post = url.split("/")[-1] + pages = 1 + elif url.split("/")[-2] == "user": + service = url.split("/")[-3] + user = url.split("/")[-1] + pages = get_amount_of_posts(service, user) -if not os.path.isdir(output): - if os.path.exists(output): - os.remove(output) - os.makedirs(output) + output = "" + if args["--output"]: + output = args.output + "/" + output += "%s-%s" % (service, user) -try: - post - pages = 1 -except Exception: - pages = get_amount_of_posts(service, user) -for page in range(pages): - try: - post - userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s" % (service, user, post)).json() - except Exception: - userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s" % (service, user, (page * 25))).json() - for i in userdata: - print(i["id"]) - count = 0 - parse_json(i, count) + for page in range(pages): + try: + post + userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s" + % (service, user, post)).json() + except Exception: + userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s" + % (service, user, (page * 25))).json() + for i in userdata: + print(i["id"]) + count = 0 + parse_json(i, count)