view channeldownloader.py @ 118:eac6dae753ca

*: major cleanup committer: GitHub <noreply@github.com>
author Paper <37962225+mrpapersonic@users.noreply.github.com>
date Fri, 03 Mar 2023 22:51:28 +0000
parents 80bd4a99ea00
children 196cf2e3d96e
line wrap: on
line source

#!/usr/bin/env python3
"""
Usage:
  channeldownloader.py <url>... (--database <file>)
                                [--output <folder>]
                                [--proxy <proxy>]
  channeldownloader.py -h | --help

Arguments:
  <url>                        YouTube channel URL to download from

Options:
  -h --help                    Show this screen
  -o --output <folder>         Output folder, relative to the current directory
                               [default: .]
  -d --database <file>         HTTP or HTTPS proxy (SOCKS5 with PySocks)
"""
from __future__ import print_function
import docopt
import internetarchive
try:
    import orjson as json
except ImportError:
    import json
import os
import re
import time
import urllib.request
import requests  # need this for ONE (1) exception
import yt_dlp as youtube_dl
from urllib.error import HTTPError
from yt_dlp.utils import sanitize_filename, DownloadError
from pathlib import Path
from requests.exceptions import ConnectTimeout


class MyLogger(object):
    def debug(self, msg):
        pass

    def warning(self, msg):
        pass

    def error(self, msg):
        print(" " + msg)
        pass


def ytdl_hook(d) -> None:
    if d["status"] == "finished":
        print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
    if d["status"] == "downloading":
        print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
                                         d["_percent_str"]), end="")
    if d["status"] == "error":
        print("\n an error occurred downloading %s!"
              % (os.path.basename(d["filename"])))


def load_split_files(path: str) -> dict:
    if os.path.isdir(path):
        result = {"videos": []}
        for fi in os.listdir(path):
            for f in re.findall(r"vids[0-9\-]+?\.json", fi):
                with open(path + "/" + f, "r", encoding="utf-8") as infile:
                    jsonnn = json.loads(infile.read())
                    result["videos"].extend(jsonnn)
        return result
    else:
        return json.loads(open(path, "r", encoding="utf-8").read())


def reporthook(count: int, block_size: int, total_size: int) -> None:
    global start_time
    if count == 0:
        start_time = time.time()
        return
    percent = int(count * block_size * 100 / total_size)
    print(" downloading %d%%        \r" % (percent), end="")


def write_metadata(i: dict, basename: str) -> None:
    if not os.path.exists(basename + ".info.json"):
        with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
            try:
                jsonfile.write(json.dumps(i).decode("utf-8"))
            except AttributeError:
                jsonfile.write(json.dumps(i))
            print(" saved %s" % os.path.basename(jsonfile.name))
    if not os.path.exists(basename + ".description"):
        with open(basename + ".description", "w",
                  encoding="utf-8") as descfile:
            descfile.write(i["description"])
            print(" saved %s" % os.path.basename(descfile.name))


ytdl_opts = {
    "retries": 100,
    "nooverwrites": True,
    "call_home": False,
    "quiet": True,
    "writeinfojson": True,
    "writedescription": True,
    "writethumbnail": True,
    "writeannotations": True,
    "writesubtitles": True,
    "allsubtitles": True,
    "addmetadata": True,
    "continuedl": True,
    "embedthumbnail": True,
    "format": "bestvideo+bestaudio/best",
    "restrictfilenames": True,
    "no_warnings": True,
    "progress_hooks": [ytdl_hook],
    "logger": MyLogger(),
    "ignoreerrors": False,
}


def wayback_machine_dl(video: dict, basename: str) -> int:
    try:
        url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu",
                       "rl.archive.org/yt/%s"])
        headers = urllib.request.urlopen(url % video["id"])
        contenttype = headers.getheader("Content-Type")
        if contenttype == "video/webm":
            ext = "webm"
        elif contenttype == "video/mp4":
            ext = "mp4"
        else:
            raise HTTPError(url=None, code=None, msg=None,
                            hdrs=None, fp=None)
        urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext),
                                   reporthook)
        print(" downloaded %s.%s" % (basename, ext))
        return 0
    except TimeoutError:
        return 1
    except HTTPError:
        print(" video not available on the Wayback Machine!")
        return 0
    except Exception as e:
        print(" unknown error downloading video!\n")
        print(e)
        return 0

def internet_archive_dl(video: dict, basename: str) -> int:
    if internetarchive.get_item("youtube-%s" % video["id"]).exists:
        fnames = [f.name for f in internetarchive.get_files(
                                  "youtube-%s" % video["id"])]
        flist = []
        for fname in range(len(fnames)):
            if re.search(''.join([r"((?:.+?-)?", video["id"],
                                  r"\.(?:mp4|jpg|webp|mkv|webm|info\\.json|des"
                                  r"cription|annotations.xml))"]),
                                 fnames[fname]):
                flist.append(fnames[fname])
        while True:
            try:
                internetarchive.download("youtube-%s" % video["id"],
                                         files=flist, verbose=True,
                                         destdir=output,
                                         no_directory=True,
                                         ignore_existing=True,
                                         retries=9999)
                break
            except ConnectTimeout:
                continue
            except Exception:
                return 0
        if flist[0][:len(video["id"])] == video["id"]:
            for fname in flist:
                if os.path.exists("%s/%s" % (output, fname)):
                    os.replace("%s/%s" % (output, fname),
                               "%s-%s" % (basename.rsplit("-", 1)[0],
                                          fname))
        return 1
    return 0

def main():
    args = docopt.docopt(__doc__)

    if not os.path.exists(args["--output"]):
        os.mkdir(args["--output"])

    for i in load_split_files(args["--database"])["videos"]:
        uploader = i["uploader_id"] if "uploader_id" in i else None
        for url in args["<url>"]:
            channel = url.split("/")[-1]

            output = "%s/%s" % (args["--output"], channel)
            if not os.path.exists(output):
                os.mkdir(output)
            ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s"

            if uploader == channel:
                print("%s:" % i["id"])
                basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
                                         restricted=True), i["id"])
                path = Path(output)
                files = list(path.glob("*-%s.mkv" % i["id"]))
                files.extend(list(path.glob("*-%s.mp4" % i["id"])))
                files.extend(list(path.glob("*-%s.webm" % i["id"])))
                if files:
                    print(" video already downloaded!")
                    write_metadata(i, basename)
                    continue
                # this code is *really* ugly... todo a rewrite?
                with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
                    try:
                        ytdl.extract_info("https://youtube.com/watch?v=%s"
                                          % i["id"])
                        continue
                    except DownloadError:
                        print(" video is not available! attempting to find In"
                              "ternet Archive pages of it...")
                    except Exception as e:
                        print(" unknown error downloading video!\n")
                        print(e)
                if internet_archive_dl(i, basename) == 0:  # if we can't download from IA
                    print(" video does not have a Internet Archive page! attem"
                          "pting to download from the Wayback Machine...")
                    while True:
                        if wayback_machine_dl(i, basename) == 0:  # success
                            break
                        time.sleep(5)
                        continue
                write_metadata(i, basename)


if __name__ == "__main__":
    main()