view channeldownloader.py @ 117:40a7b6d9bd3b

officially deprecate kemonopartydownloader.py committer: GitHub <noreply@github.com>
author Paper <37962225+mrpapersonic@users.noreply.github.com>
date Fri, 03 Mar 2023 22:33:53 +0000
parents 80bd4a99ea00
children eac6dae753ca
line wrap: on
line source

#!/usr/bin/env python3
"""
Usage:
  channeldownloader.py <url>... (--database <file>)
                                [--output <folder>]
                                [--proxy <proxy>]
  channeldownloader.py -h | --help

Arguments:
  <url>                        YouTube channel URL to download from

Options:
  -h --help                    Show this screen
  -o --output <folder>         Output folder, relative to the current directory
                               [default: .]
  -d --database <file>         HTTP or HTTPS proxy (SOCKS5 with PySocks)
"""
from __future__ import print_function
import docopt
import internetarchive
try:
    import orjson as json
except ImportError:
    import json
import os
import re
import time
try:
    import urllib.request as compat_urllib
    from urllib.error import HTTPError
except ImportError:  # Python 2
    import urllib as compat_urllib
    from urllib2 import HTTPError
try:
    import yt_dlp as youtube_dl
    from yt_dlp.utils import sanitize_filename, DownloadError
except ImportError:
    try:
        import youtube_dl
        from youtube_dl.utils import sanitize_filename, DownloadError
    except ImportError:
        print("ERROR: youtube-dl/yt-dlp not installed!")
        exit(1)
from io import open  # for Python 2 compatibility, in Python 3 this
                     # just maps to the built-in function

class MyLogger(object):
    def debug(self, msg):
        pass

    def warning(self, msg):
        pass

    def error(self, msg):
        print(" " + msg)
        pass

def ytdl_hook(d):
    if d["status"] == "finished":
        print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
    if d["status"] == "downloading":
        print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="")
    if d["status"] == "error":
        print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"])))

def load_split_files(path):
    if os.path.isdir(path):
        result = {"videos": []}
        for fi in os.listdir(path):
            for f in re.findall(r"vids[0-9\-]+?\.json", fi):
                with open(path + "/" + f, "r", encoding="utf-8") as infile:
                    jsonnn = json.loads(infile.read())
                    result["videos"].extend(jsonnn)
        return result
    else:
        return json.loads(open(path, "r", encoding="utf-8").read())

def reporthook(count, block_size, total_size):
    global start_time
    if count == 0:
        start_time = time.time()
        return
    duration = time.time() - start_time
    percent = int(count * block_size * 100 / total_size)
    print(" downloading %d%%        \r" % (percent), end="")

args = docopt.docopt(__doc__)

ytdl_opts = {
    "retries": 100,
    "nooverwrites": True,
    "call_home": False,
    "quiet": True,
    "writeinfojson": True,
    "writedescription": True,
    "writethumbnail": True,
    "writeannotations": True,
    "writesubtitles": True,
    "allsubtitles": True,
    "ignoreerrors": True,
    "addmetadata": True,
    "continuedl": True,
    "embedthumbnail": True,
    "format": "bestvideo+bestaudio/best",
    "restrictfilenames": True,
    "no_warnings": True,
    "progress_hooks": [ytdl_hook],
    "logger": MyLogger(),
    "ignoreerrors": False,
}

if not os.path.exists(args["--output"]):
    os.mkdir(args["--output"])

for i in load_split_files(args["--database"])["videos"]:
    uploader = i["uploader_id"] if "uploader_id" in i else None
    for url in args["<url>"]:
        channel = url.split("/")[-1]

        output = "%s/%s" % (args["--output"], channel)
        if not os.path.exists(output):
            os.mkdir(output)
        ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s"
        

        if uploader == channel:
            print("%s:" % i["id"])
            # :skull:
            # todo: put this in a function?
            if any(x in os.listdir(output) for x in [sanitize_filename("%s-%s.mp4"  % (i["title"], i["id"]), restricted=True),
                                                     sanitize_filename("%s-%s.mkv"  % (i["title"], i["id"]), restricted=True),
                                                     sanitize_filename("%s-%s.webm" % (i["title"], i["id"]), restricted=True)]):
                print(" video already downloaded!")
                continue
            # this code is *really* ugly... todo a rewrite?
            with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
                try:
                    result = ytdl.extract_info("https://youtube.com/watch?v=%s" % i["id"])
                    continue
                except DownloadError:
                    print(" video is not available! attempting to find Internet Archive pages of it...")
                except Exception as e:
                    print(" unknown error downloading video!\n")
                    print(e)
            if internetarchive.get_item("youtube-%s" % i["id"]).exists:  # download from internetarchive if available
                fnames = [f.name for f in internetarchive.get_files("youtube-%s" % i["id"])]
                flist = []
                for fname in range(len(fnames)):
                    if re.search("((?:.+?-)?%s\.(?:mp4|jpg|webp|mkv|webm|info\.json|description))" % (i["id"]), fnames[fname]):
                        flist.append(fnames[fname])
                if len(flist) >= 1:
                    internetarchive.download("youtube-%s" % i["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True, retries=9999)
                else:
                    print(" video already downloaded!")
                    continue
                if os.path.exists("%s/%s.info.json" % (output, i["id"])):  # will always exist no matter which setting was used to download
                    for fname in flist:
                        if os.path.exists(output + "/" + fname) and not os.path.exists(output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname):
                            os.rename(output + "/" + fname, output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname)
                else:
                    print("ID file not found!")
            else:
                print(" video does not have a Internet Archive page! attempting to download from the Wayback Machine...")
                try:  # we could use yt-dlp's extractor, but then we would need to craft a fake wayback machine url,
                      # and we wouldn't even know if it worked. so let's continue using our little "hack"
                    headers = compat_urllib.urlopen("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"])
                    if hasattr(headers.info(), "getheader"):
                        contenttype = headers.info().getheader("Content-Type")
                    else:
                        contenttype = headers.getheader("Content-Type")
                    if contenttype == "video/webm":
                        ext = "webm"
                    elif contenttype == "video/mp4":
                        ext = "mp4"
                    else:
                        raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None)
                    compat_urllib.urlretrieve("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"], "%s/%s-%s.%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"], ext), reporthook)
                    print(" downloaded %s-%s.%s" % (sanitize_filename(i["title"], restricted=True), i["id"], ext))
                except HTTPError:
                    print(" video not available on the Wayback Machine!")
                except Exception as e:
                    print(" unknown error downloading video!\n")
                    print(e)
            # metadata
            basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"])
            if not os.path.exists(basename + ".info.json"):
                with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
                    try:
                        jsonfile.write(json.dumps(i).decode("utf-8"))
                    except AttributeError:
                        jsonfile.write(json.dumps(i))
                    print(" saved %s" % os.path.basename(jsonfile.name))
            if not os.path.exists(basename + ".description"):
                with open(basename + ".description", "w", encoding="utf-8") as descfile:
                    descfile.write(i["description"])
                    print(" saved %s" % os.path.basename(descfile.name))