changeset 117:40a7b6d9bd3b

officially deprecate kemonopartydownloader.py committer: GitHub <noreply@github.com>
author Paper <37962225+mrpapersonic@users.noreply.github.com>
date Fri, 03 Mar 2023 22:33:53 +0000
parents 205fc01d5eb4
children eac6dae753ca
files README.md kemonopartydownloader.py
diffstat 2 files changed, 2 insertions(+), 357 deletions(-) [+]
line wrap: on
line diff
--- a/README.md	Sat Feb 04 13:46:06 2023 -0500
+++ b/README.md	Fri Mar 03 22:33:53 2023 +0000
@@ -24,10 +24,8 @@
 ### [intro.py](intro.py)
 originally created to extract all of the yuru yuri intros
 
-### [kemonopartydownloader.py](kemonopartydownloader.py)
-made this when i was bored<br>
-simple [kemono.party](https://kemono.party) downloader, supports proxies and continuing unfinished downloads<br>
-requires cookies because of DDoS-GUARD, just use a cookies.txt compatible file<br>
+### [kemonopartydownloader.py]
+This script has been deprecated and deleted from this repository, use [gallery-dl](https://github.com/mikf/gallery-dl) and parse the JSON for links instead
 
 ### [mmltonsf.bat](mmltonsf.bat)
 this is a conversion of a script from bash to batch, it just makes compiling nsfs from mmls easier
--- a/kemonopartydownloader.py	Sat Feb 04 13:46:06 2023 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,353 +0,0 @@
-"""
-Usage:
-  kemonopartydownloader.py <url>... (--cookies <filename>)
-                                    [--output <folder>]
-                                    [--proxy <proxy>]
-                                    [--timeout <seconds>]
-  kemonopartydownloader.py -h | --help
-
-Arguments:
-  <url>                        Kemono.party URL to download from
-  -c --cookies <filename>      A Netscape-compatible cookies.txt file
-
-Options:
-  -h --help                    Show this screen
-  -o --output <folder>         Output folder, relative to the current directory
-                               [default: .]
-  -p --proxy <proxy>           HTTP or HTTPS proxy (SOCKS5 with PySocks)
-  -t --timeout <seconds>       Time between downloads [default: 1]
-"""
-import docopt
-import http.cookiejar
-import os
-import re
-import requests  # pip install requests
-import time
-import math
-import zipfile
-import urllib.parse
-import yt_dlp
-from yt_dlp.utils import sanitize_filename as sanitize
-from urllib.error import HTTPError
-from http.client import BadStatusLine
-
-
-def download_folder_from_google_drive(link: str) -> int:
-    takeout_domain = "https://takeout-pa.clients6.google.com"
-    drive_id = link.split("?")[0].split("/")[-1]
-    ses = requests.Session()
-    ses.headers = {
-        'origin': 'https://drive.google.com',
-        'content-type': 'application/json',
-    }
-    key = "AIzaSyC1qbk75NzWBvSaDh6KnsjjA9pIrP4lYIE"  # google anonymous key
-    takeoutjs = ses.post(takeout_domain + "/v1/exports?key=%s" % (key),
-                         data='{"items":[{"id":"%s"}]}' % (drive_id)).json()
-    takeoutid = str(takeoutjs["exportJob"]["id"])
-    storagePath = None
-    while storagePath is None:
-        succeededjson = ses.get(takeout_domain + "/v1/exports/%s?key=%s"
-                                % (takeoutid, key)).json()["exportJob"]
-        if succeededjson["status"] == "SUCCEEDED":
-            storagePath = str(succeededjson["archives"][0]
-                              ["storagePath"])
-        time.sleep(1)
-    size = 0
-    for path, dirs, files in os.walk("./%s/Drive - %s"
-                                     % (output, sanitize(i["title"]))):
-        for f in files:
-            fp = os.path.join(path, f)
-            size += os.path.getsize(fp)
-    try:
-        if size >= int(succeededjson["archives"][0]["sizeOfContents"]):
-            print("  %s already downloaded!" % (succeededjson["archives"][0]
-                                                ["fileName"]))
-            return 1
-    except Exception as e:
-        print("  %s download failed! %s" % (succeededjson["archives"][0]
-                                            ["fileName"], str(e)))
-    response = ses.get(storagePath, stream=True)
-    amountdone = 0
-    with open(succeededjson["archives"][0]["fileName"], "wb") as f:
-        for chunk in response.iter_content(4096):
-            if chunk:  # filter out keep-alive new chunks
-                f.write(chunk)
-                amountdone += 4096
-                print("  downloading %s: %.2f%%"
-                      % (succeededjson["archives"][0]["fileName"],
-                         (amountdone / int(succeededjson["archives"][0]
-                          ["compressedSize"])) * 100), end="\r")
-        print("  downloaded  %s: 100.00%%    "
-              % (succeededjson["archives"][0]["fileName"]))
-    unzip(succeededjson["archives"][0]["fileName"], "./%s/Drive - %s"
-                                                    % (output,
-                                                       sanitize(i["title"])))
-    os.remove(succeededjson["archives"][0]["fileName"])
-    return 0
-
-
-def unzip(src_path: str, dst_dir: str, pwd: str = None) -> None:
-    with zipfile.ZipFile(src_path) as zf:
-        members = zf.namelist()
-        for member in members:
-            arch_info = zf.getinfo(member)
-            arch_name = arch_info.filename.replace('/', os.path.sep)
-            dst_path = os.path.join(dst_dir, arch_name)
-            dst_path = os.path.normpath(dst_path)
-            if not os.path.exists(dst_path):
-                zf.extract(arch_info, dst_dir, pwd)
-
-
-def download_from_dropbox(link: str) -> None:
-    responsehead = req.head(link.split("?")[0] + "?dl=1", allow_redirects=True)
-    if responsehead.status_code == 404:
-        print("  dropbox link not available!")
-        return
-    filename = urllib.parse.unquote(
-               responsehead.headers["Content-Disposition"].split("'")[-1])
-    if not os.path.exists(output + "/Dropbox - " + sanitize(i["title"])):
-        os.makedirs(output + "/Dropbox - " + sanitize(i["title"]))
-    filepath = "%s/Dropbox - %s/%s" % (output, sanitize(i["title"]),
-                                       sanitize(filename))
-    if os.path.exists(filepath):
-        print("  file(s) already downloaded!")
-        return
-    filesize = os.stat(filepath).st_size if os.path.exists(filepath) else 0
-    # will always be 0 if it's a folder...
-    if filesize == 0:
-        with req.get(link.split("?")[0] + "?dl=1", stream=True,
-                     headers={"Range": "bytes=%d-" % (filesize)}) as r:
-            r.raise_for_status()
-            with open(filepath, "ab") as f:
-                for chunk in r.iter_content(chunk_size=4096):
-                    f.write(chunk)
-                    filesize += 4096
-                    print("  file %s downloading..." % (filename), end="\r")
-                print("  file %s successfully downloaded!" % (filename))
-    if filename.endswith(".zip"):
-        unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0]))
-        os.remove(filepath)
-
-
-def download_from_youtube(link: str) -> int: # int is the response
-    return 0 # just a stub for now
-
-
-# https://stackoverflow.com/a/39225039
-def download_file_from_google_drive(drive_id: str, out: str = "") -> None:
-    def get_confirm_token(response: requests.Response):
-        for key, value in response.cookies.items():
-            if key.startswith('download_warning'):
-                return value
-
-        return None
-
-    def save_response_content(response: requests.Response):
-        amountdone = 0
-        CHUNK_SIZE = 4096
-        filename = sanitize(
-                   response.headers["Content-Disposition"].split("'")[-1])
-        folderpath = "%s/Drive - %s/%s" % (output, sanitize(i["title"]), out)
-        if not os.path.exists():
-            os.makedirs(folderpath)
-        destination = "%s/%s" % (folderpath, filename)
-        filesize = (os.stat(destination).st_size
-                    if os.path.exists(destination)
-                    else 0)
-        serverfilesize = int(response.headers["Content-Range"].split('/')[0])
-
-        if (os.path.exists(destination) and filesize == serverfilesize):
-            print("  %s already downloaded!" % os.path.basename(destination))
-            return
-
-        with open(destination, "wb") as f:
-            for chunk in response.iter_content(CHUNK_SIZE):
-                if chunk:  # filter out keep-alive new chunks
-                    f.write(chunk)
-                    amountdone += CHUNK_SIZE
-                    print("  downloading %s: %.2f%%"
-                          % (os.path.basename(destination),
-                             (amountdone / serverfilesize)), end="\r")
-            print("  downloaded  %s: %.2f%%    "
-                  % (os.path.basename(destination), 100.0))
-
-    URL = "https://docs.google.com/uc?export=download"
-
-    session = requests.Session()
-
-    headers = {
-        "Range": "bytes=0-",
-    }
-
-    session.proxies = req.proxies
-
-    response = session.get(URL, headers=headers, params={'id': drive_id},
-                           stream=True)
-
-    while response.status_code == 403:
-        time.sleep(30)
-        response = session.get(URL, headers=headers, params={'id': drive_id},
-                               stream=True)
-
-    if response.status_code == 404:
-        return  # bypass when root folder has no files
-
-    token = get_confirm_token(response)
-
-    if token:
-        params = {'id': drive_id, 'confirm': token}
-        response = session.get(URL, headers=headers, params=params,
-                               stream=True)
-
-    save_response_content(response)
-
-
-def find_urls(s: str) -> list:
-    url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" +
-                 """%[0-9a-fA-F][0-9a-fA-F]))+""")
-    urllist = []
-    for findall in re.findall(url_regex, s):
-        urllist.append(findall.split("<")[0].split(">")[-1])
-    return urllist
-
-
-def download_file(i: dict, x: dict, count: int) -> None:
-    filename = "%s/%s_%dp_%s_%s" % (output, i["id"], count,
-                                    sanitize(i["title"]), x["name"])
-    amountdone = 0
-    filesize = 0
-    if os.path.exists(filename):
-        filesize = os.path.getsize(filename)
-    serverhead = req.head("https://kemono.party/data" + x['path'],
-                          allow_redirects=True)
-    for i in range(500):
-        serverfilesize = int(serverhead.headers["Content-Length"])
-        if filesize < serverfilesize:
-            amountdone += filesize
-            with req.get("https://kemono.party/data" + x['path'], stream=True,
-                         headers={"Range": "bytes=%d-" % (filesize)}) as r:
-                r.raise_for_status()
-                with open(filename, "ab") as f:
-                    for chunk in r.iter_content(chunk_size=4096):
-                        f.write(chunk)
-                        amountdone += len(chunk)
-                        print(" downloading image %d: %.2f%%"
-                              % (count, (amountdone / serverfilesize) * 100),
-                              end="\r")
-                    print(" downloaded image " + str(count) + ": 100.00%  ")
-            return
-        else:
-            print(" image " + str(count) + " already downloaded!")
-            return
-        time.sleep(10)
-    print(" download timed out!")
-    return
-
-
-def parse_json(i: dict, count: int) -> None:
-    unique_urls = []
-    for url in find_urls(i["content"]):
-        # spaghetti
-        parsed_url = urllib.parse.urlparse(url)
-        if parsed_url.netloc == "drive.google.com":
-            if parsed_url.path.startswith("/drive/folders"):
-                if url not in unique_urls:
-                    download_folder_from_google_drive(url)
-                    unique_urls.append(url)
-            elif (parsed_url.path == "/open" and
-                  parsed_url.query.startswith == "id"):
-                if url not in unique_urls:
-                    download_file_from_google_drive(
-                                                   parsed_url.query.split("=")
-                                                   [-1])
-                    unique_urls.append(url)
-            elif parsed_url.path.startswith("/file/"):
-                if url not in unique_urls:
-                    download_file_from_google_drive(parsed_url.path.split("/")
-                                                    [-2])
-                    unique_urls.append(url)
-        elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]:
-            if url not in unique_urls:
-                download_from_dropbox(url)
-                unique_urls.append(url)
-        elif parsed_url.netloc in ["youtube.com", "youtu.be", "www.youtube.com"]:
-            if url not in unique_urls:
-                download_from_youtube(url)
-                unique_urls.append(url)
-    for x in i["attachments"]:
-        count += 1
-        while not os.path.exists("%s/%s_%dp_%s_%s"
-                                 % (output, i["id"], count,
-                                    sanitize(i["title"]), x["name"])):
-            try:
-                download_file(i, x, count)
-            except (HTTPError, BadStatusLine):
-                while 1:
-                    time.sleep(10)
-                    download_file(i, x, count)
-            except Exception as e:
-                print(e)
-            time.sleep(timeout)
-
-
-def get_amount_of_posts(s: str, u: str):
-    amount = 0
-    while 1:
-        data = req.get("https://kemono.party/api/%s/user/%s?o=%d"
-                       % (s, u, amount)).json()
-        if len(data) < 25:
-            return math.ceil((amount + 1) / 25)
-        amount += 25
-
-
-args = docopt.docopt(__doc__)
-
-req = requests.Session()
-
-if args["--proxy"]:
-    req.proxies = {
-        "http": args["--proxy"],
-        "https": args["--proxy"],
-    }
-
-timeout = int(args["--timeout"])
-
-cj = http.cookiejar.MozillaCookieJar(args["--cookies"])
-cj.load(ignore_expires=True)
-req.cookies = cj
-
-for url in args["<url>"]:
-    if url.isnumeric():
-        print("do not input user IDs in --url! use a link instead")
-        continue
-
-    if url.split("/")[-2] == "post":
-        service = url.split("/")[-5]
-        user = url.split("/")[-3]
-        post = url.split("/")[-1]
-        pages = 1
-    elif url.split("/")[-2] == "user":
-        service = url.split("/")[-3]
-        user = url.split("/")[-1]
-        pages = get_amount_of_posts(service, user)
-
-    output = "%s/%s-%s" % (args["--output"], service, user)
-
-    if not os.path.exists(output):
-        os.mkdir(output)
-
-    for page in range(pages):
-        try:
-            post
-            userdata = req.get("https://kemono.party/api/%s/user/%s/post/%s"
-                               % (service, user, post)).json()
-        except Exception:
-            userdata = req.get("https://kemono.party/api/%s/user/%s?o=%s"
-                               % (service, user, (page * 25))).json()
-        for i in userdata:
-            print(i["id"])
-            count = 0
-            parse_json(i, count)
-            filename = "%s/%s_%dp_%s.info.json" % (output, i["id"], count,
-                                                   sanitize(i["title"]))
-            json.dump(i, filename)