changeset 97:f1f4f6da04bd

kemonopartydownloader.py: convert to percent formatting, add typing to functions and "fix" the drive downloading committer: GitHub <noreply@github.com>
author Paper <37962225+mrpapersonic@users.noreply.github.com>
date Sat, 13 Aug 2022 20:27:58 -0400
parents d2e0edd4a070
children e4bf37150a3f
files kemonopartydownloader.py
diffstat 1 files changed, 82 insertions(+), 82 deletions(-) [+]
line wrap: on
line diff
--- a/kemonopartydownloader.py	Sun Aug 07 11:57:09 2022 -0400
+++ b/kemonopartydownloader.py	Sat Aug 13 20:27:58 2022 -0400
@@ -1,6 +1,3 @@
-# example args.url: https://kemono.party/fanbox/user/5375435/post/2511461
-# created by Paper in 2021
-# please do not share without crediting me!
 import argparse
 import http.cookiejar
 import os
@@ -15,33 +12,33 @@
 from http.client import BadStatusLine
 
 
-def under_num(maximum, num):
+def under_num(maximum: int, num: int) -> int:
     return num if num <= maximum else maximum
 
-def download_folder_from_google_drive(link):
+def download_folder_from_google_drive(link: str) -> int:
     session = requests.Session()
     session.headers = {
         'origin': 'https://drive.google.com',
         'content-type': 'application/json',
     }
     key = "AIzaSyC1qbk75NzWBvSaDh6KnsjjA9pIrP4lYIE"  # google anonymous key
-    takeoutjs = session.post(f"https://takeout-pa.clients6.google.com/v1/exports?key={key}", data='{{"items":[{{"id":"{0}"}}]}}'.format(link.split("?")[0].split("/")[-1])).json()
-    takeoutid = takeoutjs["exportJob"]["id"]
+    takeoutjs = session.post("https://takeout-pa.clients6.google.com/v1/exports?key=%s" % (key), data='{"items":[{"id":"%s"}]}' % (link.split("?")[0].split("/")[-1])).json()
+    takeoutid = str(takeoutjs["exportJob"]["id"])
     storagePath = None
     while storagePath is None:
-        succeededjson = session.get("https://takeout-pa.clients6.google.com/v1/exports/{0}?key={1}".format(takeoutid, key)).json()
+        succeededjson = session.get("https://takeout-pa.clients6.google.com/v1/exports/%s?key=%s" % (takeoutid, key)).json()
         if succeededjson["exportJob"]["status"] == "SUCCEEDED":
-            storagePath = succeededjson["exportJob"]["archives"][0]["storagePath"]
+            storagePath = str(succeededjson["exportJob"]["archives"][0]["storagePath"])
         time.sleep(1)
     size = 0
-    for path, dirs, files in os.walk("./{0}/Drive - {1}".format(output, sanitize(i["title"]))):
+    for path, dirs, files in os.walk("./%s/Drive - %s" % (output, sanitize(i["title"]))):
         for f in files:
             fp = os.path.join(path, f)
             size += os.path.getsize(fp)
     try:
         if size >= int(succeededjson["exportJob"]["archives"][0]["sizeOfContents"]):
-            print("  {0} already downloaded!".format(succeededjson["exportJob"]["archives"][0]["fileName"]))
-            return
+            print("  %s already downloaded!" % (succeededjson["exportJob"]["archives"][0]["fileName"]))
+            return 1
     except Exception as e:
         print("  %s download failed! %s" % (succeededjson["exportJob"]["archives"][0]["fileName"], str(e)))
         print(e)
@@ -52,13 +49,14 @@
             if chunk:  # filter out keep-alive new chunks
                 f.write(chunk)
                 amountdone += 1024
-                print("  downloading {0}: ".format(succeededjson["exportJob"]["archives"][0]["fileName"]) + " " + str(round((amountdone / int(succeededjson['exportJob']['archives'][0]['compressedSize'])) * 100, 2)) + "%\r", end="")
-        print("  downloaded  {0}".format(succeededjson["exportJob"]["archives"][0]["fileName"]) + ": 100.00%    ")
-    unzip(succeededjson["exportJob"]["archives"][0]["fileName"], "./{0}/Drive - {1}".format(output, sanitize(i["title"])))
+                print("  downloading %s: %.2f%%" % (succeededjson["exportJob"]["archives"][0]["fileName"], (amountdone / int(succeededjson['exportJob']['archives'][0]['compressedSize'])) * 100), end="\r")
+        print("  downloaded  %s: 100.00%%    " % (succeededjson["exportJob"]["archives"][0]["fileName"]))
+    unzip(succeededjson["exportJob"]["archives"][0]["fileName"], "./%s/Drive - %s" % (output, sanitize(i["title"])))
     os.remove(succeededjson["exportJob"]["archives"][0]["fileName"])
+    return 0
 
 
-def unzip(src_path, dst_dir, pwd=None):
+def unzip(src_path: str, dst_dir: str, pwd=None) -> None:
     with zipfile.ZipFile(src_path) as zf:
         members = zf.namelist()
         for member in members:
@@ -70,54 +68,49 @@
                 zf.extract(arch_info, dst_dir, pwd)
 
 
-def download_from_dropbox(link):
+def download_from_dropbox(link: str) -> None:
     responsehead = req.head(link.split("?")[0] + "?dl=1", allow_redirects=True)
     if responsehead.status_code == 404:
         print("  dropbox link not available!")
         return
     if not os.path.exists(output + "/Dropbox - " + sanitize(i["title"])):
         os.makedirs(output + "/Dropbox - " + sanitize(i["title"]))
-    filename = output + "/Dropbox - " + sanitize(i["title"]) + "/" + sanitize(responsehead.headers["Content-Disposition"].split("'")[-1])
+    filename = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]), sanitize(responsehead.headers["Content-Disposition"].split("'")[-1]))
     if os.path.exists(urllib.parse.unquote(os.path.splitext(filename)[0])) and os.path.isdir(urllib.parse.unquote(os.path.splitext(filename)[0])):
         print("  file(s) already downloaded!")
         return
-    if os.path.exists(filename):
-        filesize = os.stat(filename).st_size
-    else:
-        filesize = 0
-    serverfilesize = int(responsehead.headers["Content-Length"])
-    if filesize < serverfilesize:
-        with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": f"bytes={filesize}-"}) as r:
+    filesize = os.stat(filename).st_size if os.path.exists(filename) else 0
+    # will always be 0 if it's a folder...
+    if filesize == 0:
+        with req.get(link.split("?")[0] + "?dl=1", stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r:
             r.raise_for_status()
             with open(filename, "ab") as f:
                 for chunk in r.iter_content(chunk_size=4096):
                     f.write(chunk)
                     filesize += 4096
-                    print("  file {0} downloading: ".format(urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])) + str(round((filesize / serverfilesize) * 100)) + "%\r", end="")
-                print("  {0} successfully downloaded!".format(urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])))
+                    print("  file %s downloading..." % (urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])), end="\r")
+                print("  file %s successfully downloaded!" % (urllib.parse.unquote(responsehead.headers["Content-Disposition"].split("'")[-1])))
     if responsehead.headers["Content-Disposition"].split("'")[-1].endswith(".zip"):
         unzip(filename, urllib.parse.unquote(os.path.splitext(filename)[0]))
         os.remove(filename)
 
 
-def download_file_from_google_drive(id, dir=""):  # https://stackoverflow.com/questions/25010369/wget-curl-large-file-from-google-drive/39225039
-    def get_confirm_token(response):
+def download_file_from_google_drive(drive_id: str, out: str = "") -> None:  # https://stackoverflow.com/questions/25010369/wget-curl-large-file-from-google-drive/39225039
+    def get_confirm_token(response: requests.Response):
         for key, value in response.cookies.items():
             if key.startswith('download_warning'):
                 return value
 
         return None
 
-    def save_response_content(response):
+    def save_response_content(response: requests.Response):
         amountdone = 0
         CHUNK_SIZE = 4096
-        if not os.path.exists(output + "/Drive - " + sanitize(i["title"]) + "/" + dir):
-            os.makedirs(output + "/Drive - " + sanitize(i["title"]) + "/" + dir)
-        destination = output + "/Drive - " + sanitize(i["title"]) + "/" + dir + "/" + sanitize(response.headers["Content-Disposition"].split("'")[-1])
-        if os.path.exists(destination):
-            filesize = os.stat(destination).st_size
-        else:
-            filesize = 0
+        filename = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out)
+        if not os.path.exists():
+            os.makedirs(filename)
+        destination = filename + "/" + sanitize(response.headers["Content-Disposition"].split("'")[-1])
+        filesize = os.stat(destination).st_size if os.path.exists(destination) else 0
 
         if os.path.exists(destination) and filesize == int(response.headers["Content-Range"].partition('/')[-1]):
             print("  " + os.path.basename(destination) + " already downloaded!")
@@ -128,8 +121,8 @@
                 if chunk:  # filter out keep-alive new chunks
                     f.write(chunk)
                     amountdone += CHUNK_SIZE
-                    print("  downloading {0}: ".format(os.path.basename(destination)) + " " + str(round((amountdone / int(response.headers["Content-Range"].partition('/')[-1])) * 100, 2)) + "%\r", end="")
-            print("  downloaded  {0}".format(os.path.basename(destination)) + ": 100.00%    ")
+                    print("  downloading %s: %.2f%%" % (os.path.basename(destination), (amountdone / int(response.headers["Content-Range"].partition('/')[-1]))), end="\r")
+            print("  downloaded  %s: %.2f%%    " % (os.path.basename(destination), 100.0))
 
     URL = "https://docs.google.com/uc?export=download"
 
@@ -141,11 +134,11 @@
 
     session.proxies = req.proxies
 
-    response = session.get(URL, headers=headers, params={'id': id}, stream=True)
+    response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True)
 
     while response.status_code == 403:
         time.sleep(30)
-        response = session.get(URL, headers=headers, params={'id': id}, stream=True)
+        response = session.get(URL, headers=headers, params={'id': drive_id}, stream=True)
 
     if response.status_code == 404:
         return  # bypass when root folder has no files
@@ -153,41 +146,38 @@
     token = get_confirm_token(response)
 
     if token:
-        params = {'id': id, 'confirm': token}
+        params = {'id': drive_id, 'confirm': token}
         response = session.get(URL, headers=headers, params=params, stream=True)
 
     save_response_content(response)
 
 
-def sanitize(filename):
+def sanitize(filename: str) -> str:
     return re.sub(r"[\/:*?\"<>|]", "_", filename).strip()
 
 
-def find_urls(s):
+def find_urls(s: str) -> list:
     urllist = []
     for findall in re.findall(r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+""", s):
         urllist.append(findall.split("<")[0].split(">")[-1])
     return urllist
 
 
-def downloadfile(i, x, count):
-    filename = "{4}/{0}_{1}p_{2}_{3}".format(i["id"], count, sanitize(i["title"]), x["name"], output)
+def download_file(i: dict, x: dict, count: int) -> None:
+    filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count, sanitize(i["title"]), x["name"])
     amountdone = 0
-    if os.path.exists(filename):
-        filesize = os.stat(filename).st_size
-    else:
-        filesize = 0
+    filesize = os.stat(filename).st_size if os.path.exists(filename) else 0
     serverhead = req.head("https://kemono.party/data" + x['path'], allow_redirects=True)
     for i in range(500):
         serverfilesize = int(serverhead.headers["Content-Length"])
         if filesize < serverfilesize:
-            with req.get(f"https://kemono.party/data{x['path']}", stream=True, headers={"Range": f"bytes={filesize}-"}) as r:
+            with req.get("https://kemono.party/data" + x['path'], stream=True, headers={"Range": "bytes=%d-" % (filesize)}) as r:
                 r.raise_for_status()
                 with open(filename, "ab") as f:
                     for chunk in r.iter_content(chunk_size=4096):
                         f.write(chunk)
                         amountdone += len(chunk)
-                        print(" downloading image " + str(count) + ": " + "{:.2f}".format(under_num(100, round(((filesize + amountdone) / serverfilesize) * 100, 2))), end="%\r")
+                        print(" downloading image %d: %.2f%%" % (count, under_num(100, round(((filesize + amountdone) / serverfilesize) * 100, 2))), end="\r")
                     print(" downloaded image " + str(count) + ": 100.00%  ")
             return
         else:
@@ -198,26 +188,28 @@
     return
 
 
-def parse_json(i, count):
-    seen = set()
+def parse_json(i: dict, count: int) -> None:
+    seen_gdrive_ids = set()
     unique_urls = []
     for url in find_urls(i["content"]):
-        if url.startswith("https://drive.google.com/drive/folders"):
-            if url.split("/")[-1].split("?")[0] not in seen:
+        parsed_url = urllib.parse.urlparse(url)
+        if parsed_url.netloc == "drive.google.com":
+            if parsed_url.path.startswith("drive/folders"):
+                if parsed_url.path.split("/")[-1] not in seen_gdrive_ids:
+                    unique_urls.append(url)
+                    seen_gdrive_ids.add(parsed_url.path.split("/")[-1])
+            elif parsed_url.path == "open" and parsed_url.query.startswith == "id":
+                if parsed_url.query.split("=")[-1] not in seen_gdrive_ids:
+                    unique_urls.append(req.head(url).headers["Location"], allow_redirects=True)
+                    seen_gdrive_ids.add(parsed_url.query.split("=")[-1])
+            elif parsed_url.path.startswith("file/"):
+                if parsed_url.path.split("/")[-2] not in seen_gdrive_ids:
+                    unique_urls.append(url)
+                    seen_gdrive_ids.add(url.split("?")[0].split("/")[-2])
+        elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]:
+            if url not in unique_urls:
                 unique_urls.append(url)
-                seen.add(url.split("/")[-1].split("?")[0])
-        elif url.startswith("https://drive.google.com/open?id="):
-            if url.split("?id=")[-1] not in seen:
-                unique_urls.append(req.head(url).headers["Location"], allow_redirects=True)
-                seen.add(url.split("/")[-1].split("?")[0])
-        elif url.startswith("https://drive.google.com/file/"):
-            if url.split("?")[0].split("/")[-2] not in seen:
-                unique_urls.append(url)
-                seen.add(url.split("?")[0].split("/")[-2])
-        elif url.startswith("https://www.dropbox.com"):
-            print(" Dropbox link found! attempting to download its files...")
-            download_from_dropbox(url)
-        else:  # TODO: add MEGA, or some sort of other file hosting website(s). gdrive and dropbox seem like the most popular ones atm
+        else:
             pass
     for url in unique_urls:
         if url.startswith("https://drive.google.com/drive/folders/"):
@@ -227,29 +219,28 @@
         elif url.startswith("https://drive.google.com/file/"):
             print(" Google Drive link found! attempting to download its files...")
             download_file_from_google_drive(url.split("?")[0].split("/")[-2])
+        elif url.startswith("https://www.dropbox.com/"):
+            print(" Dropbox link found! attempting to download its files...")
+            download_from_dropbox(url)
     for x in i["attachments"]:
         count += 1
-        while not os.path.exists("{4}/{0}_{1}p_{2}_{3}".format(int(i["id"]) - 1, count, sanitize(i["title"]), x["name"], output)):
+        while not os.path.exists("%s/%d_%dp_%s_%s" % (output, int(i["id"]) - 1, count, sanitize(i["title"]), x["name"])):
             try:
-                downloadfile(i, x, count)
+                download_file(i, x, count)
                 break
-            except HTTPError:
+            except (HTTPError, BadStatusLine):
                 while 1:
                     time.sleep(10)
-                    downloadfile(i, x, count)
-            except BadStatusLine:  # DDoS-GUARD
-                while 1:
-                    time.sleep(10)
-                    downloadfile(i, x, count)
+                    download_file(i, x, count)
             except Exception as e:
                 print(e)
             time.sleep(10)
 
 
-def get_amount_of_posts(s, u):
+def get_amount_of_posts(s: str, u: str):
     amount = 0
     while 1:
-        data = req.get("https://kemono.party/api/{0}/user/{1}?o={2}".format(s, u, amount)).json()
+        data = req.get("https://kemono.party/api/%s/user/%s?o=%d" % (s, u, amount)).json()
         if len(data) < 25:
             return math.ceil((amount + 1) / 25)
         amount += 25
@@ -265,6 +256,14 @@
 
 req = requests.Session()
 
+if args.testdownloadservices:
+    output = "download_services_test"
+    i = {"title": "DEEZ NUTS"}
+    download_folder_from_google_drive("https://drive.google.com/drive/folders/1rZN2ejZnGdF0EpaZuknlDp26a0qSjsEI")
+    download_from_dropbox("https://www.dropbox.com/s/yg405bpznyobo3u/test.txt?dl=0")  # File
+    download_from_dropbox("https://www.dropbox.com/sh/ne3c7bxtkt5tg4s/AABYPNGfHoil4HO_btudw0wPa?dl=0")  # Folder
+    exit()
+
 if args.proxy:
     req.proxies = {}
     if args.proxy[:6] == "socks5":
@@ -293,6 +292,7 @@
 except Exception:
     pass
 
+# TODO: use urlparse here...
 if args.url.split("/")[-2] == "post":
     service = args.url.split("/")[-5]
     user = args.url.split("/")[-3]
@@ -302,7 +302,7 @@
     user = args.url.split("/")[-1]
 
 if not args.output:
-    output = user
+    output = "%s-%s" % (service, user)
 else:
     output = args.output
 
@@ -319,9 +319,9 @@
 for page in range(pages):
     try:
         post
-        userdata = req.get("https://kemono.party/api/{0}/user/{1}/post/{2}".format(service, user, post)).json()
+        userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s" % (service, user, post)).json()
     except Exception:
-        userdata = req.get("https://kemono.party/api/{0}/user/{1}?o={2}".format(service, user, (page * 25))).json()
+        userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s" % (service, user, (page * 25))).json()
     for i in userdata:
         print(i["id"])
         count = 0