view channeldownloader.py @ 133:0d8eabdd12ab default tip

create: write H:MM:SS timestamps, add option to fill with gaussian-blur instead of black many albums are longer than one hour so writing H:MM:SS is a necessity. if anything there will just be verbose info that isn't important for my use-case. however the gaussian-blur is simply broken. It works, and it plays locally just fine, but YouTube in particular elongates the video to fit the full width. I'm not entirely sure why it does this, but it makes it useless and ugly.
author Paper <paper@tflc.us>
date Sat, 03 Jan 2026 20:25:38 -0500
parents 8ec0e91a5dcf
children
line wrap: on
line source

#!/usr/bin/env python3
"""
Usage:
  channeldownloader.py <url>... (--database <file>)
                                [--output <folder>]
  channeldownloader.py -h | --help

Arguments:
  <url>                        YouTube channel URL to download from

Options:
  -h --help                    Show this screen
  -o --output <folder>         Output folder, relative to the current directory
                               [default: .]
  -d --database <file>         YTPMV_Database compatible JSON file
"""
from __future__ import print_function
import docopt
import internetarchive
try:
    import orjson as json
except ImportError:
    import json
import os
import re
import time
import urllib.request
import requests  # need this for ONE (1) exception
import yt_dlp as youtube_dl
from urllib.error import HTTPError
from yt_dlp.utils import sanitize_filename, DownloadError
from pathlib import Path
from requests.exceptions import ConnectTimeout


class MyLogger(object):
    def debug(self, msg):
        pass

    def warning(self, msg):
        pass

    def error(self, msg):
        print(" " + msg)
        pass


def ytdl_hook(d) -> None:
    if d["status"] == "finished":
        print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
    if d["status"] == "downloading":
        print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
                                         d["_percent_str"]), end="")
    if d["status"] == "error":
        print("\n an error occurred downloading %s!"
              % (os.path.basename(d["filename"])))


def load_split_files(path: str):
    if not os.path.isdir(path):
        yield json.load(open(path, "r", encoding="utf-8"))
    for fi in os.listdir(path):
        if re.search(r"vids[0-9\-]+?\.json", fi):
            with open(path + "/" + fi, "r", encoding="utf-8") as infile:
                print(fi)
                yield json.load(infile)


def reporthook(count: int, block_size: int, total_size: int) -> None:
    global start_time
    if count == 0:
        start_time = time.time()
        return
    percent = int(count * block_size * 100 / total_size)
    print(" downloading %d%%        \r" % (percent), end="")


def write_metadata(i: dict, basename: str) -> None:
    if not os.path.exists(basename + ".info.json"):
        with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
            try:
                jsonfile.write(json.dumps(i).decode("utf-8"))
            except AttributeError:
                jsonfile.write(json.dumps(i))
            print(" saved %s" % os.path.basename(jsonfile.name))
    if not os.path.exists(basename + ".description"):
        with open(basename + ".description", "w",
                  encoding="utf-8") as descfile:
            descfile.write(i["description"])
            print(" saved %s" % os.path.basename(descfile.name))


def wayback_machine_dl(video: dict, basename: str) -> int:
    try:
        url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu",
                       "rl.archive.org/yt/%s"])
        headers = urllib.request.urlopen(url % video["id"])
        contenttype = headers.getheader("Content-Type")
        if contenttype == "video/webm":
            ext = "webm"
        elif contenttype == "video/mp4":
            ext = "mp4"
        else:
            raise HTTPError(url=None, code=None, msg=None,
                            hdrs=None, fp=None)
        urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext),
                                   reporthook)
        print(" downloaded %s.%s" % (basename, ext))
        return 0
    except TimeoutError:
        return 1
    except HTTPError:
        print(" video not available on the Wayback Machine!")
        return 0
    except Exception as e:
        print(" unknown error downloading video!\n")
        print(e)
        return 0


def ia_file_legit(path: str, vidid: str) -> bool:
    return True if re.search(''.join([r"((?:.+?-)?", vidid, r"\.(?:mp4|jpg|web"
                          r"p|mkv|webm|info\\.json|description|annotations.xml"
                          "))"]),
                         path) else False


def internet_archive_dl(video: dict, basename: str, output: str) -> int:
    if internetarchive.get_item("youtube-%s" % video["id"]).exists:
        flist = [f.name for f in internetarchive.get_files("youtube-%s" % video["id"]) if ia_file_legit(f.name, video["id"])]
        while True:
            try:
                internetarchive.download("youtube-%s" % video["id"],
                                         files=flist, verbose=True,
                                         destdir=output,
                                         no_directory=True,
                                         ignore_existing=True,
                                         retries=9999)
                break
            except ConnectTimeout:
                continue
            except Exception as e:
                print(e)
                return 0
        if flist[0][:len(video["id"])] == video["id"]:
            for fname in flist:
                if os.path.exists("%s/%s" % (output, fname)):
                    os.replace("%s/%s" % (output, fname),
                               "%s-%s" % (basename.rsplit("-", 1)[0],
                                          fname))
        return 1
    return 0


ytdl_opts = {
    "retries": 100,
    "nooverwrites": True,
    "call_home": False,
    "quiet": True,
    "writeinfojson": True,
    "writedescription": True,
    "writethumbnail": True,
    "writeannotations": True,
    "writesubtitles": True,
    "allsubtitles": True,
    "addmetadata": True,
    "continuedl": True,
    "embedthumbnail": True,
    "format": "bestvideo+bestaudio/best",
    "restrictfilenames": True,
    "no_warnings": True,
    "progress_hooks": [ytdl_hook],
    "logger": MyLogger(),
    "ignoreerrors": False,
}


def main():
    args = docopt.docopt(__doc__)

    if not os.path.exists(args["--output"]):
        os.mkdir(args["--output"])

    for f in load_split_files(args["--database"]):
        for i in f:
            uploader = i["uploader_id"] if "uploader_id" in i else None
            for url in args["<url>"]:
                channel = url.split("/")[-1]

                output = "%s/%s" % (args["--output"], channel)
                if not os.path.exists(output):
                    os.mkdir(output)
                ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s"

                if uploader == channel:
                    print(uploader, channel)
                    print("%s:" % i["id"])
                    basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
                                             restricted=True), i["id"])
                    files = [y for p in ["mkv", "mp4", "webm"] for y in list(Path(output).glob(("*-%s." + p) % i["id"]))]
                    if files:
                        print(" video already downloaded!")
                        write_metadata(i, basename)
                        continue
                    # this code is *really* ugly... todo a rewrite?
                    with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
                        try:
                            ytdl.extract_info("https://youtube.com/watch?v=%s"
                                              % i["id"])
                            continue
                        except DownloadError:
                            print(" video is not available! attempting to find In"
                                  "ternet Archive pages of it...")
                        except Exception as e:
                            print(" unknown error downloading video!\n")
                            print(e)
                    if internet_archive_dl(i, basename, output):  # if we can't download from IA
                        continue
                    print(" video does not have a Internet Archive page! attem"
                          "pting to download from the Wayback Machine...")
                    while True:
                        if wayback_machine_dl(i, basename) == 0:  # success
                            break
                        time.sleep(5)
                        continue
                    write_metadata(i, basename)


if __name__ == "__main__":
    main()