Mercurial > codedump
changeset 117:40a7b6d9bd3b
officially deprecate kemonopartydownloader.py
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Fri, 03 Mar 2023 22:33:53 +0000 |
parents | 205fc01d5eb4 |
children | eac6dae753ca |
files | README.md kemonopartydownloader.py |
diffstat | 2 files changed, 2 insertions(+), 357 deletions(-) [+] |
line wrap: on
line diff
--- a/README.md Sat Feb 04 13:46:06 2023 -0500 +++ b/README.md Fri Mar 03 22:33:53 2023 +0000 @@ -24,10 +24,8 @@ ### [intro.py](intro.py) originally created to extract all of the yuru yuri intros -### [kemonopartydownloader.py](kemonopartydownloader.py) -made this when i was bored<br> -simple [kemono.party](https://kemono.party) downloader, supports proxies and continuing unfinished downloads<br> -requires cookies because of DDoS-GUARD, just use a cookies.txt compatible file<br> +### [kemonopartydownloader.py] +This script has been deprecated and deleted from this repository, use [gallery-dl](https://github.com/mikf/gallery-dl) and parse the JSON for links instead ### [mmltonsf.bat](mmltonsf.bat) this is a conversion of a script from bash to batch, it just makes compiling nsfs from mmls easier
--- a/kemonopartydownloader.py Sat Feb 04 13:46:06 2023 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,353 +0,0 @@ -""" -Usage: - kemonopartydownloader.py <url>... (--cookies <filename>) - [--output <folder>] - [--proxy <proxy>] - [--timeout <seconds>] - kemonopartydownloader.py -h | --help - -Arguments: - <url> Kemono.party URL to download from - -c --cookies <filename> A Netscape-compatible cookies.txt file - -Options: - -h --help Show this screen - -o --output <folder> Output folder, relative to the current directory - [default: .] - -p --proxy <proxy> HTTP or HTTPS proxy (SOCKS5 with PySocks) - -t --timeout <seconds> Time between downloads [default: 1] -""" -import docopt -import http.cookiejar -import os -import re -import requests # pip install requests -import time -import math -import zipfile -import urllib.parse -import yt_dlp -from yt_dlp.utils import sanitize_filename as sanitize -from urllib.error import HTTPError -from http.client import BadStatusLine - - -def download_folder_from_google_drive(link: str) -> int: - takeout_domain = "https://takeout-pa.clients6.google.com" - drive_id = link.split("?")[0].split("/")[-1] - ses = requests.Session() - ses.headers = { - 'origin': 'https://drive.google.com', - 'content-type': 'application/json', - } - key = "AIzaSyC1qbk75NzWBvSaDh6KnsjjA9pIrP4lYIE" # google anonymous key - takeoutjs = ses.post(takeout_domain + "/v1/exports?key=%s" % (key), - data='{"items":[{"id":"%s"}]}' % (drive_id)).json() - takeoutid = str(takeoutjs["exportJob"]["id"]) - storagePath = None - while storagePath is None: - succeededjson = ses.get(takeout_domain + "/v1/exports/%s?key=%s" - % (takeoutid, key)).json()["exportJob"] - if succeededjson["status"] == "SUCCEEDED": - storagePath = str(succeededjson["archives"][0] - ["storagePath"]) - time.sleep(1) - size = 0 - for path, dirs, files in os.walk("./%s/Drive - %s" - % (output, sanitize(i["title"]))): - for f in files: - fp = os.path.join(path, f) - size += os.path.getsize(fp) - try: - if size >= int(succeededjson["archives"][0]["sizeOfContents"]): - print(" %s already downloaded!" % (succeededjson["archives"][0] - ["fileName"])) - return 1 - except Exception as e: - print(" %s download failed! %s" % (succeededjson["archives"][0] - ["fileName"], str(e))) - response = ses.get(storagePath, stream=True) - amountdone = 0 - with open(succeededjson["archives"][0]["fileName"], "wb") as f: - for chunk in response.iter_content(4096): - if chunk: # filter out keep-alive new chunks - f.write(chunk) - amountdone += 4096 - print(" downloading %s: %.2f%%" - % (succeededjson["archives"][0]["fileName"], - (amountdone / int(succeededjson["archives"][0] - ["compressedSize"])) * 100), end="\r") - print(" downloaded %s: 100.00%% " - % (succeededjson["archives"][0]["fileName"])) - unzip(succeededjson["archives"][0]["fileName"], "./%s/Drive - %s" - % (output, - sanitize(i["title"]))) - os.remove(succeededjson["archives"][0]["fileName"]) - return 0 - - -def unzip(src_path: str, dst_dir: str, pwd: str = None) -> None: - with zipfile.ZipFile(src_path) as zf: - members = zf.namelist() - for member in members: - arch_info = zf.getinfo(member) - arch_name = arch_info.filename.replace('/', os.path.sep) - dst_path = os.path.join(dst_dir, arch_name) - dst_path = os.path.normpath(dst_path) - if not os.path.exists(dst_path): - zf.extract(arch_info, dst_dir, pwd) - - -def download_from_dropbox(link: str) -> None: - responsehead = req.head(link.split("?")[0] + "?dl=1", allow_redirects=True) - if responsehead.status_code == 404: - print(" dropbox link not available!") - return - filename = urllib.parse.unquote( - responsehead.headers["Content-Disposition"].split("'")[-1]) - if not os.path.exists(output + "/Dropbox - " + sanitize(i["title"])): - os.makedirs(output + "/Dropbox - " + sanitize(i["title"])) - filepath = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]), - sanitize(filename)) - if os.path.exists(filepath): - print(" file(s) already downloaded!") - return - filesize = os.stat(filepath).st_size if os.path.exists(filepath) else 0 - # will always be 0 if it's a folder... - if filesize == 0: - with req.get(link.split("?")[0] + "?dl=1", stream=True, - headers={"Range": "bytes=%d-" % (filesize)}) as r: - r.raise_for_status() - with open(filepath, "ab") as f: - for chunk in r.iter_content(chunk_size=4096): - f.write(chunk) - filesize += 4096 - print(" file %s downloading..." % (filename), end="\r") - print(" file %s successfully downloaded!" % (filename)) - if filename.endswith(".zip"): - unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) - os.remove(filepath) - - -def download_from_youtube(link: str) -> int: # int is the response - return 0 # just a stub for now - - -# https://stackoverflow.com/a/39225039 -def download_file_from_google_drive(drive_id: str, out: str = "") -> None: - def get_confirm_token(response: requests.Response): - for key, value in response.cookies.items(): - if key.startswith('download_warning'): - return value - - return None - - def save_response_content(response: requests.Response): - amountdone = 0 - CHUNK_SIZE = 4096 - filename = sanitize( - response.headers["Content-Disposition"].split("'")[-1]) - folderpath = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out) - if not os.path.exists(): - os.makedirs(folderpath) - destination = "%s/%s" % (folderpath, filename) - filesize = (os.stat(destination).st_size - if os.path.exists(destination) - else 0) - serverfilesize = int(response.headers["Content-Range"].split('/')[0]) - - if (os.path.exists(destination) and filesize == serverfilesize): - print(" %s already downloaded!" % os.path.basename(destination)) - return - - with open(destination, "wb") as f: - for chunk in response.iter_content(CHUNK_SIZE): - if chunk: # filter out keep-alive new chunks - f.write(chunk) - amountdone += CHUNK_SIZE - print(" downloading %s: %.2f%%" - % (os.path.basename(destination), - (amountdone / serverfilesize)), end="\r") - print(" downloaded %s: %.2f%% " - % (os.path.basename(destination), 100.0)) - - URL = "https://docs.google.com/uc?export=download" - - session = requests.Session() - - headers = { - "Range": "bytes=0-", - } - - session.proxies = req.proxies - - response = session.get(URL, headers=headers, params={'id': drive_id}, - stream=True) - - while response.status_code == 403: - time.sleep(30) - response = session.get(URL, headers=headers, params={'id': drive_id}, - stream=True) - - if response.status_code == 404: - return # bypass when root folder has no files - - token = get_confirm_token(response) - - if token: - params = {'id': drive_id, 'confirm': token} - response = session.get(URL, headers=headers, params=params, - stream=True) - - save_response_content(response) - - -def find_urls(s: str) -> list: - url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + - """%[0-9a-fA-F][0-9a-fA-F]))+""") - urllist = [] - for findall in re.findall(url_regex, s): - urllist.append(findall.split("<")[0].split(">")[-1]) - return urllist - - -def download_file(i: dict, x: dict, count: int) -> None: - filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count, - sanitize(i["title"]), x["name"]) - amountdone = 0 - filesize = 0 - if os.path.exists(filename): - filesize = os.path.getsize(filename) - serverhead = req.head("https://kemono.party/data" + x['path'], - allow_redirects=True) - for i in range(500): - serverfilesize = int(serverhead.headers["Content-Length"]) - if filesize < serverfilesize: - amountdone += filesize - with req.get("https://kemono.party/data" + x['path'], stream=True, - headers={"Range": "bytes=%d-" % (filesize)}) as r: - r.raise_for_status() - with open(filename, "ab") as f: - for chunk in r.iter_content(chunk_size=4096): - f.write(chunk) - amountdone += len(chunk) - print(" downloading image %d: %.2f%%" - % (count, (amountdone / serverfilesize) * 100), - end="\r") - print(" downloaded image " + str(count) + ": 100.00% ") - return - else: - print(" image " + str(count) + " already downloaded!") - return - time.sleep(10) - print(" download timed out!") - return - - -def parse_json(i: dict, count: int) -> None: - unique_urls = [] - for url in find_urls(i["content"]): - # spaghetti - parsed_url = urllib.parse.urlparse(url) - if parsed_url.netloc == "drive.google.com": - if parsed_url.path.startswith("/drive/folders"): - if url not in unique_urls: - download_folder_from_google_drive(url) - unique_urls.append(url) - elif (parsed_url.path == "/open" and - parsed_url.query.startswith == "id"): - if url not in unique_urls: - download_file_from_google_drive( - parsed_url.query.split("=") - [-1]) - unique_urls.append(url) - elif parsed_url.path.startswith("/file/"): - if url not in unique_urls: - download_file_from_google_drive(parsed_url.path.split("/") - [-2]) - unique_urls.append(url) - elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: - if url not in unique_urls: - download_from_dropbox(url) - unique_urls.append(url) - elif parsed_url.netloc in ["youtube.com", "youtu.be", "www.youtube.com"]: - if url not in unique_urls: - download_from_youtube(url) - unique_urls.append(url) - for x in i["attachments"]: - count += 1 - while not os.path.exists("%s/%s_%dp_%s_%s" - % (output, i["id"], count, - sanitize(i["title"]), x["name"])): - try: - download_file(i, x, count) - except (HTTPError, BadStatusLine): - while 1: - time.sleep(10) - download_file(i, x, count) - except Exception as e: - print(e) - time.sleep(timeout) - - -def get_amount_of_posts(s: str, u: str): - amount = 0 - while 1: - data = req.get("https://kemono.party/api/%s/user/%s?o=%d" - % (s, u, amount)).json() - if len(data) < 25: - return math.ceil((amount + 1) / 25) - amount += 25 - - -args = docopt.docopt(__doc__) - -req = requests.Session() - -if args["--proxy"]: - req.proxies = { - "http": args["--proxy"], - "https": args["--proxy"], - } - -timeout = int(args["--timeout"]) - -cj = http.cookiejar.MozillaCookieJar(args["--cookies"]) -cj.load(ignore_expires=True) -req.cookies = cj - -for url in args["<url>"]: - if url.isnumeric(): - print("do not input user IDs in --url! use a link instead") - continue - - if url.split("/")[-2] == "post": - service = url.split("/")[-5] - user = url.split("/")[-3] - post = url.split("/")[-1] - pages = 1 - elif url.split("/")[-2] == "user": - service = url.split("/")[-3] - user = url.split("/")[-1] - pages = get_amount_of_posts(service, user) - - output = "%s/%s-%s" % (args["--output"], service, user) - - if not os.path.exists(output): - os.mkdir(output) - - for page in range(pages): - try: - post - userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s" - % (service, user, post)).json() - except Exception: - userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s" - % (service, user, (page * 25))).json() - for i in userdata: - print(i["id"]) - count = 0 - parse_json(i, count) - filename = "%s/%s_%dp_%s.info.json" % (output, i["id"], count, - sanitize(i["title"])) - json.dump(i, filename)