Mercurial > codedump
changeset 97:f1f4f6da04bd
kemonopartydownloader.py: convert to percent formatting,
add typing to functions and "fix" the drive downloading
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Sat, 13 Aug 2022 20:27:58 -0400 |
parents | d2e0edd4a070 |
children | e4bf37150a3f |
files | kemonopartydownloader.py |
diffstat | 1 files changed, 82 insertions(+), 82 deletions(-) [+] |
line wrap: on
line diff
--- a/kemonopartydownloader.py Sun Aug 07 11:57:09 2022 -0400 +++ b/kemonopartydownloader.py Sat Aug 13 20:27:58 2022 -0400 @@ -1,6 +1,3 @@ -# example args.url: https://kemono.party/fanbox/user/5375435/post/2511461 -# created by Paper in 2021 -# please do not share without crediting me! import argparse import http.cookiejar import os @@ -15,33 +12,33 @@ from http.client import BadStatusLine -def under_num(maximum, num): +def under_num(maximum: int, num: int) -> int: return num if num <= maximum else maximum -def download_folder_from_google_drive(link): +def download_folder_from_google_drive(link: str) -> int: session = requests.Session() session.headers = { 'origin': 'https://drive.google.com', 'content-type': 'application/json', } key = "AIzaSyC1qbk75NzWBvSaDh6KnsjjA9pIrP4lYIE" # google anonymous key - takeoutjs = session.post(f"https://takeout-pa.clients6.google.com/v1/exports?key={key}", data='{{"items":[{{"id":"{0}"}}]}}'.format(link.split("?")[0].split("/")[-1])).json() - takeoutid = takeoutjs["exportJob"]["id"] + takeoutjs = session.post("https://takeout-pa.clients6.google.com/v1/exports?key=%s" % (key), data='{"items":[{"id":"%s"}]}' % (link.split("?")[0].split("/")[-1])).json() + takeoutid = str(takeoutjs["exportJob"]["id"]) storagePath = None while storagePath is None: - succeededjson = session.get("https://takeout-pa.clients6.google.com/v1/exports/{0}?key={1}".format(takeoutid, key)).json() + succeededjson = session.get("https://takeout-pa.clients6.google.com/v1/exports/%s?key=%s" % (takeoutid, key)).json() if succeededjson["exportJob"]["status"] == "SUCCEEDED": - storagePath = succeededjson["exportJob"]["archives"][0]["storagePath"] + storagePath = str(succeededjson["exportJob"]["archives"][0]["storagePath"]) time.sleep(1) size = 0 - for path, dirs, files in os.walk("./{0}/Drive - {1}".format(output, sanitize(i["title"]))): + for path, dirs, files in os.walk("./%s/Drive - %s" % (output, sanitize(i["title"]))): for f in files: fp = os.path.join(path, f) size += os.path.getsize(fp) try: if size >= int(succeededjson["exportJob"]["archives"][0]["sizeOfContents"]): - print(" {0} already downloaded!".format(succeededjson["exportJob"]["archives"][0]["fileName"])) - return + print(" %s already downloaded!" % (succeededjson["exportJob"]["archives"][0]["fileName"])) + return 1 except Exception as e: print(" %s download failed! %s" % (succeededjson["exportJob"]["archives"][0]["fileName"], str(e))) print(e) @@ -52,13 +49,14 @@ if chunk: # filter out keep-alive new chunks f.write(chunk) amountdone += 1024 - print(" downloading {0}: ".format(succeededjson["exportJob"]["archives"][0]["fileName"]) + " " + str(round((amountdone / int(succeededjson['exportJob']['archives'][0]['compressedSize'])) * 100, 2)) + "%\r", end="") - print(" downloaded {0}".format(succeededjson["exportJob"]["archives"][0]["fileName"]) + ": 100.00% ") - unzip(succeededjson["exportJob"]["archives"][0]["fileName"], "./{0}/Drive - {1}".format(output, sanitize(i["title"]))) + print(" downloading %s: %.2f%%" % (succeededjson["exportJob"]["archives"][0]["fileName"], (amountdone / int(succeededjson['exportJob']['archives'][0]['compressedSize'])) * 100), end="\r") + print(" downloaded %s: 100.00%% " % (succeededjson["exportJob"]["archives"][0]["fileName"])) + unzip(succeededjson["exportJob"]["archives"][0]["fileName"], "./%s/Drive - %s" % (output, sanitize(i["title"]))) os.remove(succeededjson["exportJob"]["archives"][0]["fileName"]) + return 0 -def unzip(src_path, dst_dir, pwd=None): +def unzip(src_path: str, dst_dir: str, pwd=None) -> None: with zipfile.ZipFile(src_path) as zf: members = zf.namelist() for member in members: @@ -70,54 +68,49 @@ zf.extract(arch_info, dst_dir, pwd) -def download_from_dropbox(link): +def download_from_dropbox(link: str) -> None: responsehead = req.head(link.split("?")[0] + "?dl=1", allow_redirects=True) if responsehead.status_code == 404: print(" dropbox link not available!") return if not os.path.exists(output + "/Dropbox - " + sanitize(i["title"])): os.makedirs(output + "/Dropbox - " + sanitize(i["title"])) - filename = output + "/Dropbox - " + sanitize(i["title"]) + "/" + sanitize(responsehead.headers["Content-Disposition"].split("'")[-1]) + filename = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]), sanitize(responsehead.headers["Content-Disposition"].split("'")[-1])) if os.path.exists(urllib.parse.unquote(os.path.splitext(filename)[0])) and os.path.isdir(urllib.parse.unquote(os.path.splitext(filename)[0])): print(" file(s) already downloaded!") return - if os.path.exists(filename): - filesize = os.stat(filename).st_size - else: - filesize = 0 - serverfilesize = int(responsehead.headers["Content-Length"]) - if filesize < serverfilesize: - with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": f"bytes={filesize}-"}) as r: + filesize = os.stat(filename).st_size if os.path.exists(filename) else 0 + # will always be 0 if it's a folder... + if filesize == 0: + with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r: r.raise_for_status() with open(filename, "ab") as f: for chunk in r.iter_content(chunk_size=4096): f.write(chunk) filesize += 4096 - print(" file {0} downloading: ".format(urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])) + str(round((filesize / serverfilesize) * 100)) + "%\r", end="") - print(" {0} successfully downloaded!".format(urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1]))) + print(" file %s downloading..." % (urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])), end="\r") + print(" file %s successfully downloaded!" % (urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1]))) if responsehead.headers["Content-Disposition"].split("'")[-1].endswith(".zip"): unzip(filename, urllib.parse.unquote(os.path.splitext(filename)[0])) os.remove(filename) -def download_file_from_google_drive(id, dir=""): # https://stackoverflow.com/questions/25010369/wget-curl-large-file-from-google-drive/39225039 - def get_confirm_token(response): +def download_file_from_google_drive(drive_id: str, out: str = "") -> None: # https://stackoverflow.com/questions/25010369/wget-curl-large-file-from-google-drive/39225039 + def get_confirm_token(response: requests.Response): for key, value in response.cookies.items(): if key.startswith('download_warning'): return value return None - def save_response_content(response): + def save_response_content(response: requests.Response): amountdone = 0 CHUNK_SIZE = 4096 - if not os.path.exists(output + "/Drive - " + sanitize(i["title"]) + "/" + dir): - os.makedirs(output + "/Drive - " + sanitize(i["title"]) + "/" + dir) - destination = output + "/Drive - " + sanitize(i["title"]) + "/" + dir + "/" + sanitize(response.headers["Content-Disposition"].split("'")[-1]) - if os.path.exists(destination): - filesize = os.stat(destination).st_size - else: - filesize = 0 + filename = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out) + if not os.path.exists(): + os.makedirs(filename) + destination = filename + "/" + sanitize(response.headers["Content-Disposition"].split("'")[-1]) + filesize = os.stat(destination).st_size if os.path.exists(destination) else 0 if os.path.exists(destination) and filesize == int(response.headers["Content-Range"].partition('/')[-1]): print(" " + os.path.basename(destination) + " already downloaded!") @@ -128,8 +121,8 @@ if chunk: # filter out keep-alive new chunks f.write(chunk) amountdone += CHUNK_SIZE - print(" downloading {0}: ".format(os.path.basename(destination)) + " " + str(round((amountdone / int(response.headers["Content-Range"].partition('/')[-1])) * 100, 2)) + "%\r", end="") - print(" downloaded {0}".format(os.path.basename(destination)) + ": 100.00% ") + print(" downloading %s: %.2f%%" % (os.path.basename(destination), (amountdone / int(response.headers["Content-Range"].partition('/')[-1]))), end="\r") + print(" downloaded %s: %.2f%% " % (os.path.basename(destination), 100.0)) URL = "https://docs.google.com/uc?export=download" @@ -141,11 +134,11 @@ session.proxies = req.proxies - response = session.get(URL, headers=headers, params={'id': id}, stream=True) + response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True) while response.status_code == 403: time.sleep(30) - response = session.get(URL, headers=headers, params={'id': id}, stream=True) + response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True) if response.status_code == 404: return # bypass when root folder has no files @@ -153,41 +146,38 @@ token = get_confirm_token(response) if token: - params = {'id': id, 'confirm': token} + params = {'id': drive_id, 'confirm': token} response = session.get(URL, headers=headers, params=params, stream=True) save_response_content(response) -def sanitize(filename): +def sanitize(filename: str) -> str: return re.sub(r"[\/:*?\"<>|]", "_", filename).strip() -def find_urls(s): +def find_urls(s: str) -> list: urllist = [] for findall in re.findall(r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+""", s): urllist.append(findall.split("<")[0].split(">")[-1]) return urllist -def downloadfile(i, x, count): - filename = "{4}/{0}_{1}p_{2}_{3}".format(i["id"], count, sanitize(i["title"]), x["name"], output) +def download_file(i: dict, x: dict, count: int) -> None: + filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count, sanitize(i["title"]), x["name"]) amountdone = 0 - if os.path.exists(filename): - filesize = os.stat(filename).st_size - else: - filesize = 0 + filesize = os.stat(filename).st_size if os.path.exists(filename) else 0 serverhead = req.head("https://kemono.party/data" + x['path'], allow_redirects=True) for i in range(500): serverfilesize = int(serverhead.headers["Content-Length"]) if filesize < serverfilesize: - with req.get(f"https://kemono.party/data{x['path']}", stream=True, headers={"Range": f"bytes={filesize}-"}) as r: + with req.get("https://kemono.party/data" + x['path'], stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r: r.raise_for_status() with open(filename, "ab") as f: for chunk in r.iter_content(chunk_size=4096): f.write(chunk) amountdone += len(chunk) - print(" downloading image " + str(count) + ": " + "{:.2f}".format(under_num(100, round(((filesize + amountdone) / serverfilesize) * 100, 2))), end="%\r") + print(" downloading image %d: %.2f%%" % (count, under_num(100, round(((filesize + amountdone) / serverfilesize) * 100, 2))), end="\r") print(" downloaded image " + str(count) + ": 100.00% ") return else: @@ -198,26 +188,28 @@ return -def parse_json(i, count): - seen = set() +def parse_json(i: dict, count: int) -> None: + seen_gdrive_ids = set() unique_urls = [] for url in find_urls(i["content"]): - if url.startswith("https://drive.google.com/drive/folders"): - if url.split("/")[-1].split("?")[0] not in seen: + parsed_url = urllib.parse.urlparse(url) + if parsed_url.netloc == "drive.google.com": + if parsed_url.path.startswith("drive/folders"): + if parsed_url.path.split("/")[-1] not in seen_gdrive_ids: + unique_urls.append(url) + seen_gdrive_ids.add(parsed_url.path.split("/")[-1]) + elif parsed_url.path == "open" and parsed_url.query.startswith == "id": + if parsed_url.query.split("=")[-1] not in seen_gdrive_ids: + unique_urls.append(req.head(url).headers["Location"], allow_redirects=True) + seen_gdrive_ids.add(parsed_url.query.split("=")[-1]) + elif parsed_url.path.startswith("file/"): + if parsed_url.path.split("/")[-2] not in seen_gdrive_ids: + unique_urls.append(url) + seen_gdrive_ids.add(url.split("?")[0].split("/")[-2]) + elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: + if url not in unique_urls: unique_urls.append(url) - seen.add(url.split("/")[-1].split("?")[0]) - elif url.startswith("https://drive.google.com/open?id="): - if url.split("?id=")[-1] not in seen: - unique_urls.append(req.head(url).headers["Location"], allow_redirects=True) - seen.add(url.split("/")[-1].split("?")[0]) - elif url.startswith("https://drive.google.com/file/"): - if url.split("?")[0].split("/")[-2] not in seen: - unique_urls.append(url) - seen.add(url.split("?")[0].split("/")[-2]) - elif url.startswith("https://www.dropbox.com"): - print(" Dropbox link found! attempting to download its files...") - download_from_dropbox(url) - else: # TODO: add MEGA, or some sort of other file hosting website(s). gdrive and dropbox seem like the most popular ones atm + else: pass for url in unique_urls: if url.startswith("https://drive.google.com/drive/folders/"): @@ -227,29 +219,28 @@ elif url.startswith("https://drive.google.com/file/"): print(" Google Drive link found! attempting to download its files...") download_file_from_google_drive(url.split("?")[0].split("/")[-2]) + elif url.startswith("https://www.dropbox.com/"): + print(" Dropbox link found! attempting to download its files...") + download_from_dropbox(url) for x in i["attachments"]: count += 1 - while not os.path.exists("{4}/{0}_{1}p_{2}_{3}".format(int(i["id"]) - 1, count, sanitize(i["title"]), x["name"], output)): + while not os.path.exists("%s/%d_%dp_%s_%s" % (output, int(i["id"]) - 1, count, sanitize(i["title"]), x["name"])): try: - downloadfile(i, x, count) + download_file(i, x, count) break - except HTTPError: + except (HTTPError, BadStatusLine): while 1: time.sleep(10) - downloadfile(i, x, count) - except BadStatusLine: # DDoS-GUARD - while 1: - time.sleep(10) - downloadfile(i, x, count) + download_file(i, x, count) except Exception as e: print(e) time.sleep(10) -def get_amount_of_posts(s, u): +def get_amount_of_posts(s: str, u: str): amount = 0 while 1: - data = req.get("https://kemono.party/api/{0}/user/{1}?o={2}".format(s, u, amount)).json() + data = req.get("https://kemono.party/api/%s/user/%s?o=%d" % (s, u, amount)).json() if len(data) < 25: return math.ceil((amount + 1) / 25) amount += 25 @@ -265,6 +256,14 @@ req = requests.Session() +if args.testdownloadservices: + output = "download_services_test" + i = {"title": "DEEZ NUTS"} + download_folder_from_google_drive("https://drive.google.com/drive/folders/1rZN2ejZnGdF0EpaZuknlDp26a0qSjsEI") + download_from_dropbox("https://www.dropbox.com/s/yg405bpznyobo3u/test.txt?dl=0") # File + download_from_dropbox("https://www.dropbox.com/sh/ne3c7bxtkt5tg4s/AABYPNGfHoil4HO_btudw0wPa?dl=0") # Folder + exit() + if args.proxy: req.proxies = {} if args.proxy[:6] == "socks5": @@ -293,6 +292,7 @@ except Exception: pass +# TODO: use urlparse here... if args.url.split("/")[-2] == "post": service = args.url.split("/")[-5] user = args.url.split("/")[-3] @@ -302,7 +302,7 @@ user = args.url.split("/")[-1] if not args.output: - output = user + output = "%s-%s" % (service, user) else: output = args.output @@ -319,9 +319,9 @@ for page in range(pages): try: post - userdata = req.get("https://kemono.party/api/{0}/user/{1}/post/{2}".format(service, user, post)).json() + userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s" % (service, user, post)).json() except Exception: - userdata = req.get("https://kemono.party/api/{0}/user/{1}?o={2}".format(service, user, (page * 25))).json() + userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s" % (service, user, (page * 25))).json() for i in userdata: print(i["id"]) count = 0