Mercurial > codedump
changeset 54:5a5d47a795c6
Update kemonopartydownloader.py
how to easily make your code thrice as long
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Fri, 06 Aug 2021 02:50:53 -0400 |
parents | ae64a0c8831b |
children | 4e5000c9b48f |
files | kemonopartydownloader.py |
diffstat | 1 files changed, 243 insertions(+), 35 deletions(-) [+] |
line wrap: on
line diff
--- a/kemonopartydownloader.py Sat Jul 31 03:11:58 2021 -0400 +++ b/kemonopartydownloader.py Fri Aug 06 02:50:53 2021 -0400 @@ -1,15 +1,137 @@ # example args.url: https://kemono.party/fanbox/user/5375435/post/2511461 +# created by Paper in 2021 +# please do not share without crediting me! import argparse import http.cookiejar import os import re import requests # pip install requests import time +import math +import zipfile +import urllib.parse from urllib.error import HTTPError +def get_google_drive_subfolder_ids(link): + gdrive = requests.get(link).text + drivefiles = re.findall(r"\[\"(.{33}?)\",\[\"(.{33}?)\"\],\"(.+?)\",\"(.+?)\"", gdrive) # format: ["id","name","mimetype" + seen = set() + unique_ids = [] + for files in drivefiles: + if files[3] != "application/vnd.google-apps.folder": + continue + if files[0] not in seen: + unique_ids.append(files[0]) + seen.add(files[0]) + return unique_ids + + +def unzip(src_path, dst_dir, pwd=None): + with zipfile.ZipFile(src_path) as zf: + members = zf.namelist() + for member in members: + arch_info = zf.getinfo(member) + arch_name = arch_info.filename.replace('/', os.path.sep) + dst_path = os.path.join(dst_dir, arch_name) + dst_path = os.path.normpath(dst_path) + if not os.path.exists(dst_path): + zf.extract(arch_info, dst_dir, pwd) + + +def download_from_dropbox(link): + responsehead = requests.head(link.split("?")[0] + "?dl=1", allow_redirects=True) + if responsehead.status_code == 404: + print(" dropbox link not available!") + return + if not os.path.exists(output + "\\Dropbox - " + sanitize(i["title"])): + os.makedirs(output + "\\Dropbox - " + sanitize(i["title"])) + filename = output + "\\Dropbox - " + sanitize(i["title"]) + "\\" + sanitize(responsehead.headers["Content-Disposition"].split("'")[-1]) + if os.path.exists(urllib.parse.unquote(os.path.splitext(filename)[0])) and os.path.isdir(urllib.parse.unquote(os.path.splitext(filename)[0])): + print(" file(s) already downloaded!") + return + if os.path.exists(filename): + filesize = os.stat(filename).st_size + else: + filesize = 0 + serverfilesize = int(responsehead.headers["Content-Length"]) + if filesize < serverfilesize: + with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": f"bytes={filesize}-"}) as r: + r.raise_for_status() + with open(filename, "ab") as f: + for chunk in r.iter_content(chunk_size=4096): + f.write(chunk) + filesize += 4096 + print(" file {0} downloading: ".format(urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])) + str(round((filesize / serverfilesize) * 100)) + "%\r", end="") + print(" {0} successfully downloaded!".format(urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1]))) + if responsehead.headers["Content-Disposition"].split("'")[-1].endswith(".zip"): + unzip(filename, urllib.parse.unquote(os.path.splitext(filename)[0])) + os.remove(filename) + + +def download_file_from_google_drive(id): # https://stackoverflow.com/questions/25010369/wget-curl-large-file-from-google-drive/39225039 ;) + def get_confirm_token(response): + for key, value in response.cookies.items(): + if key.startswith('download_warning'): + return value + + return None + + def save_response_content(response): + amountdone = 0 + CHUNK_SIZE = 32768 + if not os.path.exists(output + "\\Drive - " + sanitize(i["title"])): + os.makedirs(output + "\\Drive - " + sanitize(i["title"])) + destination = output + "\\Drive - " + sanitize(i["title"]) + "\\" + sanitize(response.headers["Content-Disposition"].split("'")[-1]) + if os.path.exists(destination): + filesize = os.stat(destination).st_size + else: + filesize = 0 + + if os.path.exists(destination): + print(" " + os.path.basename(destination) + " already downloaded!") + return + + with open(destination, "wb") as f: + for chunk in response.iter_content(CHUNK_SIZE): + if chunk: # filter out keep-alive new chunks + f.write(chunk) + amountdone += CHUNK_SIZE + print(" downloading {0}: ".format(os.path.basename(destination)) + " " + str(round(filesize + amountdone / int(response.headers["Content-Range"].partition('/')[-1])) * 100) + "%\r", end="") + print(" downloaded {0}".format(os.path.basename(destination)) + ": 100% ") + + URL = "https://docs.google.com/uc?export=download" + + session = requests.Session() + + headers = { + "Range": "bytes=0-", + } + + response = session.get(URL, headers=headers, params={'id': id}, stream=True) + + while response.status_code == "403": + time.sleep(30) + response = session.get(URL, headers=headers, params={'id': id}, stream=True) + + token = get_confirm_token(response) + + if token: + params = {'id': id, 'confirm': token} + response = session.get(URL, headers=headers, params=params, stream=True) + + save_response_content(response) + + def sanitize(filename): - return re.sub(r"[\/:*?\"<>|]", "_", filename) + return re.sub(r"[\/:*?\"<>|]", "_", filename).strip() + + +def find_urls(s): + urllist = [] + for findall in re.findall("href=\\\"(https://.+?)\\\"", s): + urllist.append(re.sub(r"<[^<]+?>", "", re.sub(r"[^a-zA-Z0-9<>]+$", "", findall))) + return urllist def downloadfile(i, x, count): @@ -18,24 +140,94 @@ filesize = os.stat(filename).st_size else: filesize = 0 - if str(filesize) != req.head(f"https://data.kemono.party{x['path']}").headers["Content-Length"]: - with req.get(f"https://data.kemono.party{x['path']}", stream=True, headers={"Range": f"bytes={filesize}-"}) as r: - r.raise_for_status() - with open(filename, "ab") as f: - for chunk in r.iter_content(chunk_size=4096): - f.write(chunk) - print("image " + str(count) + " successfully downloaded!") - return - else: - print("image " + str(count) + " already downloaded!") - return + serverhead = req.head("https://data.kemono.party" + x['path']) + try: + serverfilesize = int(serverhead.headers["Content-Length"]) + if filesize < serverfilesize: + with req.get(f"https://data.kemono.party{x['path']}", stream=True, headers={"Range": f"bytes={filesize}-"}) as r: + r.raise_for_status() + with open(filename, "ab") as f: + for chunk in r.iter_content(chunk_size=4096): + f.write(chunk) + print(" image " + str(count) + " successfully downloaded!") + return + else: + print(" image " + str(count) + " already downloaded!") + return + except Exception as e: + print(" error downloading file!") + print(e) + + +def parse_json(i, count): + seen = set() + unique_urls = [] + for url in find_urls(i["content"]): + if url.startswith("https://drive.google.com/drive/folders"): + if url.split("/")[-1].split("?")[0] not in seen: + unique_urls.append(url) + seen.add(url.split("/")[-1].split("?")[0]) + elif url.startswith("https://drive.google.com/open?id="): + if url.split("?id=")[-1] not in seen: + unique_urls.append(requests.head(url).headers["Location"]) + seen.add(url.split("/")[-1].split("?")[0]) + elif url.startswith("https://drive.google.com/file/"): + if url.split("?")[0].split("/")[-2] not in seen: + unique_urls.append(url) + seen.add(url.split("?")[0].split("/")[-2]) + elif url.startswith("https://www.dropbox.com"): + download_from_dropbox(url) + else: # TODO: add MEGA, or some sort of other file hosting website(s). gdrive and dropbox seem like the most popular ones atm + pass + for url in unique_urls: + if url.startswith("https://drive.google.com/drive/folders/"): + # Google Drive folder downloading + # NOTE: this doesn't currently support subfolders! they seem like a pain in the ass to implement without the api... + print(" Google Drive link found! attempting to download its files...") + unique_ids = [url.split("?")[0].split("/")[-1]] + drive_ids_to_download = [] + while len(unique_ids) > 0: + for myid in unique_ids: + unique_ids = get_google_drive_subfolder_ids("https://drive.google.com/drive/folders/" + myid) + for ids in unique_ids: + drive_ids_to_download.append(ids) + for ids in drive_ids_to_download: + gdrive = requests.get("https://drive.google.com/drive/folders/" + ids).text + driveids = re.findall(r'jsdata=" M2rjcd;_;\d (?:.+?);(.+?);', gdrive) + for driveid in driveids: + if not driveid.startswith("driveweb|"): + download_file_from_google_drive(driveid) + elif url.startswith("https://drive.google.com/file/"): + download_file_from_google_drive(url.split("?")[0].split("/")[-2]) + for x in i["attachments"]: + count += 1 + while not os.path.exists("{4}\\{0}_{1}p_{2}_{3}".format(int(i["id"]) - 1, count, sanitize(i["title"]), os.path.basename(x["path"]), output)): + try: + downloadfile(i, x, count) + break + except HTTPError: + time.sleep(10) + downloadfile(i, x, count) + except Exception as e: + print(e) + time.sleep(10) + + +def get_amount_of_posts(s, u): + amount = 0 + while 1: + data = req.get("https://kemono.party/api/{0}/user/{1}?o={2}".format(s, u, amount)).json() + if len(data) < 25: + return math.ceil((amount + 1) / 25) + amount += 25 parser = argparse.ArgumentParser(description="Downloads (deleted) videos from YTPMV creators") parser.add_argument("-u", "--url", help="user URL", metavar='<url>', required=True) -parser.add_argument("-c", "--cookies", help="", metavar='<url>', required=True) # required because of DDoS-GUARD -parser.add_argument("-p", "--proxy", help="proxy\n supported types: http, https, socks5 (requires pysocks)", metavar='<url>') # SOCKS proxy support is through PySocks - pip install pysocks -parser.add_argument("-o", "--output", help="output folder, defaults to user ID", metavar='<url>') +parser.add_argument("-c", "--cookies", help="", metavar='<cookies>', required=True) # required because of DDoS-GUARD +parser.add_argument("-p", "--proxy", help="proxy\n supported types: http, https, socks5 (requires pysocks)", metavar='<proxy>') # SOCKS proxy support is through PySocks - pip install pysocks +parser.add_argument("-o", "--output", help="output folder, defaults to user ID", metavar='<output>') +parser.add_argument("--test-download-services", dest="testdownloadservices", nargs="+", help="test download services\nsupported: gdrive, dropbox", metavar="<service>") args = parser.parse_args() req = requests.Session() @@ -63,7 +255,7 @@ try: int(args.url) - print("do not input user IDs here! use a link instead") + print("do not input user IDs in --url! use a link instead") exit() except Exception: pass @@ -72,11 +264,9 @@ service = args.url.split("/")[-5] user = args.url.split("/")[-3] post = args.url.split("/")[-1] - userdata = req.get("https://kemono.party/api/{0}/user/{1}/post/{2}".format(service, user, post)).json() elif args.url.split("/")[-2] == "user": service = args.url.split("/")[-3] user = args.url.split("/")[-1] - userdata = req.get("https://kemono.party/api/{0}/user/{1}".format(service, user)).json() if not args.output: output = user @@ -86,21 +276,39 @@ if not os.path.isdir(output): if os.path.exists(output): os.remove(output) - os.mkdir(output) + os.makedirs(output) -for i in userdata: - print(i["id"]) - post = i["id"] - count = 0 - for x in i["attachments"]: - count += 1 - while not os.path.exists("{4}\\{0}_{1}p_{2}_{3}".format(int(i["id"]) - 1, count, sanitize(i["title"]), os.path.basename(x["path"]), output)): - try: - downloadfile(i, x, count) - break - except HTTPError: - time.sleep(10) - downloadfile(i, x, count) - except Exception as e: - print(e) - time.sleep(10) +if args.testdownloadservices: + i = { + "title": "Test" + } + if "gdrive" in args.testdownloadservices: + unique_ids = ["1sMVOcUesv4Ua_KJ-eQ_CMS_5KkrZGFdF"] + drive_ids_to_download = [unique_ids[0].split("?")[0].split("/")[-1]] + while len(unique_ids) > 0: + for i in unique_ids: + unique_ids = get_google_drive_subfolder_ids("https://drive.google.com/drive/folders/" + i) + for ids in unique_ids: + drive_ids_to_download.append(ids) + print(drive_ids_to_download) + if "dropbox" in args.testdownloadservices: + download_from_dropbox("https://www.dropbox.com/s/yg405bpznyobo3u/test.txt?dl=0") # File + download_from_dropbox("https://www.dropbox.com/sh/ne3c7bxtkt5tg4s/AABYPNGfHoil4HO_btudw0wPa?dl=0") # Folder + exit() + +try: + post + pages = 1 +except Exception: + pages = get_amount_of_posts(service, user) +for page in range(pages): + try: + post + userdata = req.get("https://kemono.party/api/{0}/user/{1}/post/{2}".format(service, user, post)).json() + except Exception: + userdata = req.get("https://kemono.party/api/{0}/user/{1}?o={2}".format(service, user, (page * 25))).json() + for i in userdata: + print(i["id"]) + post = i["id"] + count = 0 + parse_json(i, count)