view channeldownloader.py @ 17:0d10b2ce0140

2026
author Paper <paper@tflc.us>
date Sat, 28 Feb 2026 20:59:59 -0500
parents 088d9a3a2524
children 05e71dd6b6ca
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# channeldownloader.py - scrapes youtube videos from a channel from
# a variety of sources

# Copyright (c) 2021-2026 Paper <paper@tflc.us>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Usage:
  channeldownloader.py <url>... (--database <file>)
                                [--output <folder>]
  channeldownloader.py -h | --help

Arguments:
  <url>                        YouTube channel URL to download from

Options:
  -h --help                    Show this screen
  -o --output <folder>         Output folder, relative to the current directory
                               [default: .]
  -d --database <file>         yt-dlp style database of videos. Should contain
                               an array of yt-dlp .info.json data. For example,
                               FinnOtaku's YTPMV metadata archive.
"""

# Built-in python stuff (no possible missing dependencies)
from __future__ import print_function
import docopt
import os
import re
import time
import urllib.request
import os
import ssl
import io
import shutil
import xml.etree.ElementTree as XmlET
from urllib.error import HTTPError
from pathlib import Path

# We can utilize special simdjson features if it is available
simdjson = False

try:
    import simdjson as json
    simdjson = True
    print("INFO: using simdjson")
except ImportError:
    try:
        import ujson as json
        print("INFO: using ujson")
    except ImportError:
        try:
            import orjson as json
            print("INFO: using orjson")
        except ImportError:
            import json
            print("INFO: using built-in json (slow!)")

ytdlp_works = False

try:
    import yt_dlp as youtube_dl
    from yt_dlp.utils import sanitize_filename, DownloadError
    ytdlp_works = True
except ImportError:
    print("failed to import yt-dlp!")
    print("downloading from YouTube directly will not work.")

ia_works = False

try:
    import internetarchive
    from requests.exceptions import ConnectTimeout
    ia_works = True
except ImportError:
    print("failed to import the Internet Archive's python library!")
    print("downloading from IA will not work.")

zipfile_works = False

try:
    import zipfile
    zipfile_works = True
except ImportError:
    print("failed to import zipfile!")
    print("loading the database from a .zip file will not work.")

##############################################################################
## DOWNLOADERS

# All downloaders should be a function under this signature:
#    dl(video: dict, basename: str, output: str) -> int
# where:
#    'video': the .info.json scraped from the YTPMV metadata archive.
#    'basename': the basename output to write as.
#    'output': the output directory.
# yes, it's weird, but I don't care ;)
#
# Magic return values:
#  0 -- all good, video is downloaded
#  1 -- error downloading video; it may still be available if we try again
#  2 -- video is proved totally unavailable here. give up


# Basic downloader template.
#
# This does a brute-force of all extensions within vexts and iexts
# in an attempt to find a working video link.
#
# linktemplate is a template to be created using the video ID and
# extension. For example:
#    https://cdn.ytarchiver.com/%s.%s
def basic_dl_template(video: dict, basename: str, output: str,
        linktemplate: str, vexts: list, iexts: list) -> int:
    # actual downloader
    def basic_dl_impl(vid: str, ext: str) -> int:
        url = (linktemplate % (vid, ext))
        try:
            with urllib.request.urlopen(url) as headers:
                with open("%s.%s" % (basename, ext), "wb") as f:
                    f.write(headers.read())
            print(" downloaded %s.%s" % (basename, ext))
            return 0
        except TimeoutError:
            return 1
        except HTTPError:
            return 2
        except Exception as e:
            print(" unknown error downloading video!")
            print(e)
            return 1

    for exts in [vexts, iexts]:
        for ext in exts:
            r = basic_dl_impl(video["id"], ext)
            if r == 0:
                break  # done!
            elif r == 1:
                # timeout; try again later?
                return 1
            elif r == 2:
                continue
        else:
            # we did not break out of the loop
            # which means all extensions were unavailable
            return 2

    # video was downloaded successfully
    return 0


# GhostArchive, basic...
def ghostarchive_dl(video: dict, basename: str, output: str) -> int:
    return basic_dl_template(video, basename, output,
        "https://ghostvideo.b-cdn.net/chimurai/%s.%s",
        ["mp4", "webm", "mkv"],
        [] # none
    )


# media.desirintoplaisir.net
#
# holds PRIMARILY popular videos (i.e. no niche internet microcelebrities)
# or weeb shit, however it seems to be growing to other stuff.
#
# there isn't really a proper API; I've based the scraping off of the HTML
# and the public source code.
def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int:
    return basic_dl_template(video, basename, output,
        "https://media.desirintoplaisir.net/content/%s.%s",
        ["mp4", "webm", "mkv"],
        ["webp"]
    )


# Internet Archive's Wayback Machine
#
# Internally, IA's javascript routines forward to the magic
# URL used here.
#
# TODO: Download thumbnails through the CDX API:
# https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py
# the CDX API is pretty slow though, so it should be used as a last resort.
def wayback_dl(video: dict, basename: str, output: str) -> int:
    try:
        url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv"
               "e.org/yt/%s" % video["id"])
        with urllib.request.urlopen(url) as headers:
            contenttype = headers.getheader("Content-Type")
            if contenttype == "video/webm" or contenttype == "video/mp4":
                ext = contenttype.split("/")[-1]
            else:
                raise HTTPError(url=None, code=None, msg=None,
                                hdrs=None, fp=None)
            with open("%s.%s" % (basename, ext), "wb") as f:
                f.write(headers.read())
        print(" downloaded %s.%s" % (basename, ext))
        return 0
    except TimeoutError:
        return 1
    except HTTPError:
        # dont keep trying
        return 2
    except Exception as e:
        print(" unknown error downloading video!")
        print(e)
        return 1


# Also captures the ID for comparison
IA_REGEX = re.compile(r"(?:(?P<date>\d{8}) - )?(?P<title>.+?)?(?:-| \[)?(?:(?P<id>[A-z0-9_\-]{11})]?|(?: \((?P<format>(?:(?:(?P<resolution>\d+)p_(?P<fps>\d+)fps_(?P<vcodec>H264)-)?(?P<abitrate>\d+)kbit_(?P<acodec>AAC|Vorbis))|BQ|Description)\)))\.(?P<extension>mp4|info\.json|description|annotations\.xml|webp|mkv|webm|jpg|jpeg|ogg|txt|m4a)$")


# Internet Archive (tubeup)
def ia_dl(video: dict, basename: str, output: str) -> int:
    def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool:
        # FIXME:
        #
        # There are some items on IA that combine the old tubeup behavior
        # (i.e., including the sanitized video name before the ID)
        # and the new tubeup behavior (filename only contains the video ID)
        # hence we will download the entire video twice.
        #
        # This isn't much of a problem anymore (and hasn't been for like 3
        # years), since I contributed code to not upload something if there
        # is already something there. However we should handle this case
        # anyway.
        #
        # Additionally, there are some items that have duplicate video files
        # (from when the owners changed the title). We should ideally only
        # download unique files. IA seems to provide SHA1 hashes...
        #
        # We should also check if whether the copy on IA is higher quality
        # than a local copy... :)

        IA_ID = "youtube-%s" % vidid

        # Ignore IA generated thumbnails
        if f.startswith("%s.thumbs/" % IA_ID) or f == "__ia_thumb.jpg":
            return False

        for i in ["_archive.torrent", "_files.xml", "_meta.sqlite", "_meta.xml"]:
            if f == (IA_ID + i):
                return False

        # Try to match with our known filename regex
        # This properly matches:
        #   ??????????? - YYYYMMDD - TITLE [ID].EXTENSION
        #   old tubeup  - TITLE-ID.EXTENSION
        #   tubeup      - ID.EXTENSION
        #   JDownloader - TITLE (FORMAT).EXTENSION
        # (Possibly we should match other filenames too??)
        m = re.match(IA_REGEX, f)
        if m is None:
            return False

        if m.group("id"):
            return (m.group("id") == vidid)
        elif m.group("title") is not None:
            def asciify(s: str) -> str:
                # Replace all non-ASCII chars with underscores, and get rid of any whitespace
                return ''.join([i if ord(i) >= 0x20 and ord(i) < 0x80 and i not in "/\\" else '_' for i in s]).strip()

            if asciify(m.group("title")) == asciify(vidtitle):
                return True  # Close enough

        # Uh oh
        return False

    def ia_get_original_files(identifier: str) -> typing.Optional[list]:
        def ia_xml(identifier: str) -> typing.Optional[str]:
            for _ in range(1, 9999):
                try:
                    with urllib.request.urlopen("https://archive.org/download/%s/%s_files.xml" % (identifier, identifier)) as req:
                        return req.read().decode("utf-8")
                except HTTPError as e:
                    if e.code == 404 or e.code == 503:
                        return None
                    time.sleep(5)

        d = ia_xml(identifier)
        if d is None:
            return None

        try:
            # Now parse the XML and make a list of each original file
            return [x.attrib["name"] for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d))]
        except Exception as e:
            print(e)
            return None

    originalfiles = ia_get_original_files("youtube-%s" % video["id"])
    if not originalfiles:
        return 2

    flist = [
        f
        for f in originalfiles
        if ia_file_legit(f, video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"])
    ]

    if not flist:
        return 2 # ??????

    while True:
        try:
            internetarchive.download("youtube-%s" % video["id"], files=flist,
                                     verbose=True, ignore_existing=True,
                                     retries=9999)
            break
        except ConnectTimeout:
            time.sleep(1)
            continue
        except Exception as e:
            print(e)
            return 1

    # Newer versions of tubeup save only the video ID.
    # Account for this by replacing it.
    #
    # paper/2025-08-30: fixed a bug where video IDs with hyphens
    # would incorrectly truncate
    #
    # paper/2026-02-27: an update in the IA python library changed
    # the way destdir works, so it just gets entirely ignored.
    for fname in flist:
        def getext(s: str, vidid: str) -> typing.Optional[str]:
            # special cases
            for i in [".info.json", ".annotations.xml"]:
                if s.endswith(i):
                    return i

            # Handle JDownloader "TITLE (Description).txt"
            if s.endswith(" (Description).txt"):
                return ".description"

            # Catch-all for remaining extensions
            spli = os.path.splitext(s)
            if spli is None or len(spli) != 2:
                return None

            return spli[1]

        ondisk = "youtube-%s/%s" % (video["id"], fname)

        if not os.path.exists(ondisk):
            continue

        ext = getext(fname, video["id"])
        if ext is None:
            continue

        os.replace(ondisk, "%s%s" % (basename, ext))

    shutil.rmtree("youtube-%s" % video["id"])

    return 0


def ytdlp_dl(video: dict, basename: str, output: str) -> int:
    # intentionally ignores all messages besides errors
    class MyLogger(object):
        def debug(self, msg):
            pass

        def warning(self, msg):
            pass

        def error(self, msg):
            print(" " + msg)
            pass


    def ytdl_hook(d) -> None:
        if d["status"] == "finished":
            print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
        if d["status"] == "downloading":
            print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
                                             d["_percent_str"]), end="")
        if d["status"] == "error":
            print("\n an error occurred downloading %s!"
                  % (os.path.basename(d["filename"])))

    ytdl_opts = {
        "retries": 100,
        "nooverwrites": True,
        "call_home": False,
        "quiet": True,
        "writeinfojson": True,
        "writedescription": True,
        "writethumbnail": True,
        "writeannotations": True,
        "writesubtitles": True,
        "allsubtitles": True,
        "addmetadata": True,
        "continuedl": True,
        "embedthumbnail": True,
        "format": "bestvideo+bestaudio/best",
        "restrictfilenames": True,
        "no_warnings": True,
        "progress_hooks": [ytdl_hook],
        "logger": MyLogger(),
        "ignoreerrors": False,

        #mm, output template
        "outtmpl": output + "/%(title)s-%(id)s.%(ext)s",
    }

    with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
        try:
            ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"])
            return 0
        except DownloadError:
            return 2
        except Exception as e:
            print(" unknown error downloading video!\n")
            print(e)

    return 1


# TODO: There are multiple other youtube archival websites available.
# Most notable is https://findyoutubevideo.thetechrobo.ca .
# This combines a lot of sparse youtube archival services, and has
# a convenient API we can use. Nice!
#
# There is also the "Distributed YouTube Archive" which is totally
# useless because there's way to automate it...

##############################################################################


def main():
    def load_split_files(path: str):
        def cruft(isdir: bool, listdir, openf):
            # build the path list
            if not isdir:
                list_files = [path]
            else:
                list_files = filter(lambda x: re.search(r"vids[0-9\-]+?\.json", x), listdir())

            # now open each as a json
            for fi in list_files:
                print(fi)
                with openf(fi, "r") as infile:
                    if simdjson:
                        # Using this is a lot faster in SIMDJSON, since instead
                        # of converting all of the JSON key/value pairs into
                        # native Python objects, they stay in an internal state.
                        #
                        # This means we only get the stuff we absolutely need,
                        # which is the uploader ID, and copy everything else
                        # if the ID is one we are looking for.
                        parser = json.Parser()
                        yield parser.parse(infile.read())
                        del parser
                    else:
                        yield json.load(infile)


        try:
            if not zipfile_works or os.path.isdir(path):
                raise Exception

            with zipfile.ZipFile(path, "r") as myzip:
                yield from cruft(True, lambda: myzip.namelist(), lambda f, m: io.TextIOWrapper(myzip.open(f, mode=m), encoding="utf-8"))
        except Exception as e:
            yield from cruft(os.path.isdir(path), lambda: os.listdir(path), lambda f, m: open(path + "/" + f, m, encoding="utf-8"))


    def write_metadata(i: dict, basename: str) -> None:
        # ehhh
        if not os.path.exists(basename + ".info.json"):
            with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
                try:
                    # orjson outputs bytes
                    jsonfile.write(json.dumps(i).decode("utf-8"))
                except AttributeError:
                    # everything else outputs a string
                    jsonfile.write(json.dumps(i))
                print(" saved %s" % os.path.basename(jsonfile.name))
        if not os.path.exists(basename + ".description"):
            with open(basename + ".description", "w",
                      encoding="utf-8") as descfile:
                descfile.write(i["description"])
                print(" saved %s" % os.path.basename(descfile.name))

    args = docopt.docopt(__doc__)

    if not os.path.exists(args["--output"]):
        os.mkdir(args["--output"])

    channels = dict()

    for url in args["<url>"]:
        chn = url.split("/")[-1]
        channels[chn] = {"output": "%s/%s" % (args["--output"], chn)}

    for channel in channels.values():
        if not os.path.exists(channel["output"]):
            os.mkdir(channel["output"])

    # find videos in the database.
    #
    # despite how it may seem, this is actually really fast, and fairly
    # memory efficient too (but really only if we're using simdjson...)
    videos = [
        i if not simdjson else i.as_dict()
        for f in load_split_files(args["--database"])
        for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird
        if "uploader_id" in i and i["uploader_id"] in channels
    ]

    while True:
        if len(videos) == 0:
            break

        videos_copy = videos

        for i in videos_copy:
            channel = channels[i["uploader_id"]]

            # precalculated for speed
            output = channel["output"]

            print("%s:" % i["id"])
            basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
                                     restricted=True), i["id"])
            def filenotworthit(f) -> bool:
                try:
                    return bool(os.path.getsize(f))
                except:
                    return False

            pathoutput = Path(output)

            # This is terrible
            files = list(filter(filenotworthit, [y
                     for p in ["mkv", "mp4", "webm"]
                     for y in pathoutput.glob(("*-%s." + p) % i["id"])]))
            if files:
                print(" video already downloaded!")
                videos.remove(i)
                write_metadata(i, basename)
                continue

            # high level "download" function.
            def dl(video: dict, basename: str, output: str):
                dls = []

                if ytdlp_works:
                    dls.append({
                        "func": ytdlp_dl,
                        "name": "using yt-dlp",
                    })

                if ia_works:
                    dls.append({
                        "func": ia_dl,
                        "name": "from the Internet Archive",
                    })

                dls.append({
                    "func": desirintoplaisir_dl,
                    "name": "from LMIJLM/DJ Plaisir's archive",
                })
                dls.append({
                    "func": ghostarchive_dl,
                    "name": "from GhostArchive"
                })
                dls.append({
                    "func": wayback_dl,
                    "name": "from the Wayback Machine"
                })

                for dl in dls:
                    print(" attempting to download %s" % dl["name"])
                    r = dl["func"](i, basename, output)
                    if r == 0:
                        # all good, video's downloaded
                        return 0
                    elif r == 2:
                        # video is unavailable here
                        print(" oops, video is not available there...")
                        continue
                    elif r == 1:
                        # error while downloading; likely temporary.
                        # TODO we should save which downloader the video
                        # was on, so we can continue back at it later.
                        return 1
                # video is unavailable everywhere
                return 2

            r = dl(i, basename, output)
            if r == 1:
                continue

            # video is downloaded, or it's totally unavailable, so
            # remove it from being checked again.
            videos.remove(i)
            # ... and then dump the metadata, if there isn't any on disk.
            write_metadata(i, basename)

            if r == 0:
                # video is downloaded
                continue

            # video is unavailable; write out the metadata.
            print(" video is unavailable everywhere; dumping out metadata only")


if __name__ == "__main__":
    main()