view channeldownloader.py @ 14:03c8fd4069fb default tip

*: big refactor, switch to GPLv2, and add README Okay: now, we use a modular approach for downloaders. Each downloader is provided through a single function (which does the fetching). Additionally, the internetarchive library is optional now if the user does not want to install it. yt-dlp is still necessary though for it's sanitize_filename function. If and when I get to adding vanity features (such as finding the best possible source by comparing resolution and bitrate), I'll probably separate out all of the downloaders into different files. I also moved this project to a separate repository from 'codedump', keeping all of the relevant commit history :)
author Paper <paper@tflc.us>
date Sat, 30 Aug 2025 17:09:56 -0400
parents 2e7a3725ad21
children
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# channeldownloader.py - scrapes youtube videos from a channel from
# a variety of sources

# Copyright (c) 2021-2025 Paper <paper@tflc.us>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Usage:
  channeldownloader.py <url>... (--database <file>)
                                [--output <folder>]
  channeldownloader.py -h | --help

Arguments:
  <url>                        YouTube channel URL to download from

Options:
  -h --help                    Show this screen
  -o --output <folder>         Output folder, relative to the current directory
                               [default: .]
  -d --database <file>         yt-dlp style database of videos. Should contain
                               an array of yt-dlp .info.json data. For example,
                               FinnOtaku's YTPMV metadata archive.
"""

# Built-in python stuff (no possible missing dependencies)
from __future__ import print_function
import docopt
import os
import re
import time
import urllib.request
import os
import ssl
from urllib.error import HTTPError
from pathlib import Path

# We can utilize special simdjson features if it is available
simdjson = False

try:
    import simdjson as json
    simdjson = True
    print("INFO: using simdjson")
except ImportError:
    try:
        import ujson as json
        print("INFO: using ujson")
    except ImportError:
        try:
            import orjson as json
            print("INFO: using orjson")
        except ImportError:
            import json
            print("INFO: using built-in json (slow!)")

ytdlp_works = False

try:
    import yt_dlp as youtube_dl
    from yt_dlp.utils import sanitize_filename, DownloadError
    ytdlp_works = True
except ImportError:
    print("failed to import yt-dlp!")
    print("downloading from YouTube directly will not work.")

ia_works = False

try:
    import internetarchive
    from requests.exceptions import ConnectTimeout
    ia_works = True
except ImportError:
    print("failed to import the Internet Archive's python library!")
    print("downloading from IA will not work.")

##############################################################################
## DOWNLOADERS

# All downloaders should be a function under this signature:
#    dl(video: dict, basename: str, output: str) -> int
# where:
#    'video': the .info.json scraped from the YTPMV metadata archive.
#    'basename': the basename output to write as.
#    'output': the output directory.
# yes, it's weird, but I don't care ;)
#
# Magic return values:
#  0 -- all good, video is downloaded
#  1 -- error downloading video; it may still be available if we try again
#  2 -- video is proved totally unavailable here. give up


# Basic downloader template.
#
# This does a brute-force of all extensions within vexts and iexts
# in an attempt to find a working video link.
#
# linktemplate is a template to be created using the video ID and
# extension. For example:
#    https://cdn.ytarchiver.com/%s.%s
def basic_dl_template(video: dict, basename: str, output: str,
        linktemplate: str, vexts: list, iexts: list) -> int:
    # actual downloader
    def basic_dl_impl(vid: str, ext: str) -> int:
        url = (linktemplate % (vid, ext))
        try:
            with urllib.request.urlopen(url) as headers:
                with open("%s.%s" % (basename, ext), "wb") as f:
                    f.write(headers.read())
            print(" downloaded %s.%s" % (basename, ext))
            return 0
        except TimeoutError:
            return 1
        except HTTPError:
            return 2
        except Exception as e:
            print(" unknown error downloading video!")
            print(e)
            return 1

    for exts in [vexts, iexts]:
        for ext in exts:
            r = basic_dl_impl(video["id"], ext)
            if r == 0:
                break  # done!
            elif r == 1:
                # timeout; try again later?
                return 1
            elif r == 2:
                continue
        else:
            # we did not break out of the loop
            # which means all extensions were unavailable
            return 2

    # video was downloaded successfully
    return 0


# GhostArchive, basic...
def ghostarchive_dl(video: dict, basename: str, output: str) -> int:
    return basic_dl_template(video, basename, output,
        "https://ghostvideo.b-cdn.net/chimurai/%s.%s",
        ["mp4", "webm", "mkv"],
        [] # none
    )


# media.desirintoplaisir.net
#
# holds PRIMARILY popular videos (i.e. no niche internet microcelebrities)
# or weeb shit, however it seems to be growing to other stuff.
#
# there isn't really a proper API; I've based the scraping off of the HTML
# and the public source code.
def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int:
    return basic_dl_template(video, basename, output,
        "https://media.desirintoplaisir.net/content/%s.%s",
        ["mp4", "webm", "mkv"],
        ["webp"]
    )


# Internet Archive's Wayback Machine
#
# Internally, IA's javascript routines forward to the magic
# URL used here.
#
# TODO: Download thumbnails through the CDX API:
# https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py
# the CDX API is pretty slow though, so it should be used as a last resort.
def wayback_dl(video: dict, basename: str, output: str) -> int:
    try:
        url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv"
               "e.org/yt/%s" % video["id"])
        with urllib.request.urlopen(url) as headers:
            contenttype = headers.getheader("Content-Type")
            if contenttype == "video/webm" or contenttype == "video/mp4":
                ext = contenttype.split("/")[-1]
            else:
                raise HTTPError(url=None, code=None, msg=None,
                                hdrs=None, fp=None)
            with open("%s.%s" % (basename, ext), "wb") as f:
                f.write(headers.read())
        print(" downloaded %s.%s" % (basename, ext))
        return 0
    except TimeoutError:
        return 1
    except HTTPError:
        # dont keep trying
        return 2
    except Exception as e:
        print(" unknown error downloading video!")
        print(e)
        return 1


# Internet Archive (tubeup)
def ia_dl(video: dict, basename: str, output: str) -> int:
    def ia_file_legit(file: internetarchive.File, vidid: str) -> bool:
        # FIXME:
        #
        # There are some items on IA that combine the old tubeup behavior
        # (i.e., including the sanitized video name before the ID)
        # and the new tubeup behavior (filename only contains the video ID)
        # hence we will download the entire video twice.
        #
        # This isn't much of a problem anymore (and hasn't been for like 3
        # years), since I contributed code to not upload something if there
        # is already something there. However we should handle this case
        # anyway.
        #
        # Additionally, there are some items that have duplicate video files
        # (from when the owners changed the title). We should ideally only
        # download unique files. IA seems to provide SHA1 hashes...
        #
        # We should also check if whether the copy on IA is higher quality
        # than a local copy... :)
        if not re.search(r"((?:.+?-)?" + vidid + r"\.(?:mp4|jpg|webp|mkv|w"
                         r"ebm|info\.json|description|annotations.xml))",
                         f.name):
            return False

        # now, check the metadata
        print(f)
        return True


    if not internetarchive.get_item("youtube-%s" % video["id"]).exists:
        return 2

    flist = [
        f.name
        for f in internetarchive.get_files("youtube-%s" % video["id"])
        if ia_file_legit(f.name, video["id"])
    ]
    while True:
        try:
            internetarchive.download("youtube-%s" % video["id"], files=flist,
                                     verbose=True, destdir=output,
                                     no_directory=True, ignore_existing=True,
                                     retries=9999)
            break
        except ConnectTimeout:
            time.sleep(1)
            continue
        except Exception as e:
            print(e)
            return 1

    # Newer versions of tubeup save only the video ID.
    # Account for this by replacing it.
    #
    # paper/2025-08-30: fixed a bug where video IDs with hyphens
    # would incorrectly truncate
    for fname in flist:
        # ignore any files whose names are not simply the ID
        if os.path.splitext(fname)[0] != video["id"]:
            continue
            
        if os.path.exists("%s/%s" % (output, fname)):
            os.replace("%s/%s" % (output, fname),
                       "%s.%s" % (basename, os.path.splitext(fname))[1])
    return 0


def ytdlp_dl(video: dict, basename: str, output: str) -> int:
    # intentionally ignores all messages besides errors
    class MyLogger(object):
        def debug(self, msg):
            pass

        def warning(self, msg):
            pass

        def error(self, msg):
            print(" " + msg)
            pass


    def ytdl_hook(d) -> None:
        if d["status"] == "finished":
            print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
        if d["status"] == "downloading":
            print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
                                             d["_percent_str"]), end="")
        if d["status"] == "error":
            print("\n an error occurred downloading %s!"
                  % (os.path.basename(d["filename"])))

    ytdl_opts = {
        "retries": 100,
        "nooverwrites": True,
        "call_home": False,
        "quiet": True,
        "writeinfojson": True,
        "writedescription": True,
        "writethumbnail": True,
        "writeannotations": True,
        "writesubtitles": True,
        "allsubtitles": True,
        "addmetadata": True,
        "continuedl": True,
        "embedthumbnail": True,
        "format": "bestvideo+bestaudio/best",
        "restrictfilenames": True,
        "no_warnings": True,
        "progress_hooks": [ytdl_hook],
        "logger": MyLogger(),
        "ignoreerrors": False,

        #mm, output template
        "outtmpl": output + "/%(title)s-%(id)s.%(ext)s",
    }

    with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
        try:
            ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"])
            return 0
        except DownloadError:
            return 2
        except Exception as e:
            print(" unknown error downloading video!\n")
            print(e)

    return 1


# TODO: There are multiple other youtube archival websites available.
# Most notable is https://findyoutubevideo.thetechrobo.ca .
# This combines a lot of sparse youtube archival services, and has
# a convenient API we can use. Nice!
#
# There is also the "Distributed YouTube Archive" which is totally
# useless because there's way to automate it...

##############################################################################


def main():
    # generator; creates a list of files, and returns the parsed form of
    # each. note that the parser is not necessarily 
    def load_split_files(path: str):
        list_files = []

        # build the path list
        if not os.path.isdir(path):
            list_files.append(path)
        else:
            for fi in os.listdir(path):
                if re.search(r"vids[0-9\-]+?\.json", fi):
                    list_files.append(path + "/" + fi)

        # now open each as a json
        for fi in list_files:
            print(fi)
            with open(fi, "r", encoding="utf-8") as infile:
                if simdjson:
                    # Using this is a lot faster in SIMDJSON, since instead
                    # of converting all of the JSON key/value pairs into
                    # native Python objects, they stay in an internal state.
                    #
                    # This means we only get the stuff we absolutely need,
                    # which is the uploader ID, and copy everything else
                    # if the ID is one we are looking for.
                    parser = json.Parser()
                    yield parser.parse(infile.read())
                    del parser
                else:
                    yield json.load(infile)


    def write_metadata(i: dict, basename: str) -> None:
        # ehhh
        if not os.path.exists(basename + ".info.json"):
            with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
                try:
                    # orjson outputs bytes
                    jsonfile.write(json.dumps(i).decode("utf-8"))
                except AttributeError:
                    # everything else outputs a string
                    jsonfile.write(json.dumps(i))
                print(" saved %s" % os.path.basename(jsonfile.name))
        if not os.path.exists(basename + ".description"):
            with open(basename + ".description", "w",
                      encoding="utf-8") as descfile:
                descfile.write(i["description"])
                print(" saved %s" % os.path.basename(descfile.name))

    args = docopt.docopt(__doc__)

    if not os.path.exists(args["--output"]):
        os.mkdir(args["--output"])

    channels = dict()

    for url in args["<url>"]:
        chn = url.split("/")[-1]
        channels[chn] = {"output": "%s/%s" % (args["--output"], chn)}

    for channel in channels.values():
        if not os.path.exists(channel["output"]):
            os.mkdir(channel["output"])

    # find videos in the database.
    #
    # despite how it may seem, this is actually really fast, and fairly
    # memory efficient too (but really only if we're using simdjson...)
    videos = [
        i if not simdjson else i.as_dict()
        for f in load_split_files(args["--database"])
        for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird
        if "uploader_id" in i and i["uploader_id"] in channels
    ]

    while True:
        if len(videos) == 0:
            break

        videos_copy = videos

        for i in videos_copy:
            channel = channels[i["uploader_id"]]

            # precalculated for speed
            output = channel["output"]

            print("%s:" % i["id"])
            basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
                                     restricted=True), i["id"])
            files = [y
                     for p in ["mkv", "mp4", "webm"]
                     for y in Path(output).glob(("*-%s." + p) % i["id"])]
            if files:
                print(" video already downloaded!")
                videos.remove(i)
                write_metadata(i, basename)
                continue

            # high level "download" function.
            def dl(video: dict, basename: str, output: str):
                dls = []

                if ytdlp_works:
                    dls.append({
                        "func": ytdlp_dl,
                        "name": "using yt-dlp",
                    })

                if ia_works:
                    dls.append({
                        "func": ia_dl,
                        "name": "from the Internet Archive",
                    })

                dls.append({
                    "func": desirintoplaisir_dl,
                    "name": "from LMIJLM/DJ Plaisir's archive",
                })
                dls.append({
                    "func": ghostarchive_dl,
                    "name": "from GhostArchive"
                })
                dls.append({
                    "func": wayback_dl,
                    "name": "from the Wayback Machine"
                })

                for dl in dls:
                    print(" attempting to download %s" % dl["name"])
                    r = dl["func"](i, basename, output)
                    if r == 0:
                        # all good, video's downloaded
                        return 0
                    elif r == 2:
                        # video is unavailable here
                        print(" oops, video is not available there...")
                        continue
                    elif r == 1:
                        # error while downloading; likely temporary.
                        # TODO we should save which downloader the video
                        # was on, so we can continue back at it later.
                        return 1
                # video is unavailable everywhere
                return 2

            r = dl(i, basename, output)
            if r == 1:
                continue

            # video is downloaded, or it's totally unavailable, so
            # remove it from being checked again.
            videos.remove(i)
            # ... and then dump the metadata, if there isn't any on disk.
            write_metadata(i, basename)

            if r == 0:
                # video is downloaded
                continue

            # video is unavailable; write out the metadata.
            print(" video is unavailable everywhere; dumping out metadata only")


if __name__ == "__main__":
    main()