view channeldownloader.py @ 18:05e71dd6b6ca default tip

no more ia python library
author Paper <paper@tflc.us>
date Sat, 28 Feb 2026 22:31:59 -0500
parents 0d10b2ce0140
children
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# channeldownloader.py - scrapes youtube videos from a channel from
# a variety of sources

# Copyright (c) 2021-2026 Paper <paper@tflc.us>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# Okay, this is a bit of a clusterfuck.
#
# This originated as a script that simply helped me scrape a bunch
# of videos off some deleted channels (in fact, that's still it's main
# purpose) and was very lackluster (hardcoded shite everywhere).
# Fortunately in recent times I've cleaned up the code and added some
# other mirrors, as well as improved the archive.org scraper to not
# shoot itself when it encounters an upload that's not from tubeup.
#
# Nevertheless, I still consider much of this file to be dirty hacks,
# especially some of the HTTP stuff.

"""
Usage:
  channeldownloader.py <url>... (--database <file>)
                                [--output <folder>]
  channeldownloader.py -h | --help

Arguments:
  <url>                        YouTube channel URL to download from

Options:
  -h --help                    Show this screen
  -o --output <folder>         Output folder, relative to the current directory
                               [default: .]
  -d --database <file>         yt-dlp style database of videos. Should contain
                               an array of yt-dlp .info.json data. For example,
                               FinnOtaku's YTPMV metadata archive.
"""

# Built-in python stuff (no possible missing dependencies)
from __future__ import print_function
import docopt
import os
import re
import time
import urllib.request
import urllib.parse
import os
import ssl
import io
import shutil
import xml.etree.ElementTree as XmlET
import enum
from urllib.error import HTTPError
from pathlib import Path

# We can utilize special simdjson features if it is available
simdjson = False

try:
    import simdjson as json
    simdjson = True
    print("INFO: using simdjson")
except ImportError:
    try:
        import ujson as json
        print("INFO: using ujson")
    except ImportError:
        try:
            import orjson as json
            print("INFO: using orjson")
        except ImportError:
            import json
            print("INFO: using built-in json (slow!)")

ytdlp_works = False

try:
    import yt_dlp as youtube_dl
    from yt_dlp.utils import sanitize_filename, DownloadError
    ytdlp_works = True
except ImportError:
    print("failed to import yt-dlp!")
    print("downloading from YouTube directly will not work.")

zipfile_works = False

try:
    import zipfile
    zipfile_works = True
except ImportError:
    print("failed to import zipfile!")
    print("loading the database from a .zip file will not work.")


##############################################################################
## DOWNLOADERS

# All downloaders should be a function under this signature:
#    dl(video: dict, basename: str, output: str) -> int
# where:
#    'video': the .info.json scraped from the YTPMV metadata archive.
#    'basename': the basename output to write as.
#    'output': the output directory.
# yes, it's weird, but I don't care ;)

class DownloaderStatus(enum.Enum):
    # Download finished successfully.
    SUCCESS = 0
    # Download failed.
    # Note that this should NOT be used for when the video is unavailable
    # (i.e. error 404); it should only be used when the video cannot be
    # downloaded *at this time*, indicating a server problem. This is very
    # common for the Internet Archive, not sure about others.
    ERROR = 1
    # Video is unavailable from this provider.
    UNAVAILABLE = 2

"""
Downloads a file from `url` to `path`, and prints the progress to the
screen.
"""
def download_file(url: str, path: str, guessext: bool = False, length: int = None) -> DownloaderStatus:
    # Download in 32KiB chunks
    CHUNK_SIZE = 32768

    # Don't exceed 79 chars.
    try:
        with urllib.request.urlopen(url) as http:
            if length is None:
                # Check whether the URL gives us Content-Length.
                # If so, call f.truncate to tell the filesystem how much
                # we will be downloading before we start writing.
                #
                # This is also useful for displaying how much we've
                # downloaded overall as a percent.
                length = http.getheader("Content-Length", default=None)
                try:
                    if length is not None:
                        length = int(length)
                        f.truncate(length)
                except:
                    # fuck it
                    length = None

            if guessext:
                # Guess file extension from MIME type
                mime = http.getheader("Content-Type", default=None)
                if not mime:
                    return DownloaderStatus.ERROR

                if mime == "video/mp4":
                    path += ".mp4"
                elif mime == "video/webm":
                    path += ".webm"
                else:
                    return DownloaderStatus.ERROR

            par = os.path.dirname(path)
            if not os.path.isdir(par):
                os.makedirs(par)

            with open(path, "wb") as f:
                # Download the entire file
                while True:
                    data = http.read(CHUNK_SIZE)
                    if not data:
                        break

                    f.write(data)
                    print("\r downloading to %s, " % path, end="")
                    if length:
                        print("%.2f%%" % (f.tell() / length * 100.0), end="")
                    else:
                        print("%.2f MiB" % (f.tell() / (1 << 20)), end="")

                print("\r downloaded to %s        " % path)

                if length is not None and length != f.tell():
                    # Server lied about what the length was?
                    print(" INFO: HTTP server's Content-Length header lied??")
    except TimeoutError:
        return DownloaderStatus.ERROR
    except HTTPError:
        return DownloaderStatus.UNAVAILABLE
    except Exception as e:
        print(" unknown error downloading video;", e);
        return DownloaderStatus.ERROR

    return DownloaderStatus.SUCCESS


# Basic downloader template.
#
# This does a brute-force of all extensions within vexts and iexts
# in an attempt to find a working video link.
#
# linktemplate is a template to be created using the video ID and
# extension. For example:
#    https://cdn.ytarchiver.com/%s.%s
def basic_dl_template(video: dict, basename: str, output: str,
        linktemplate: str, vexts: list, iexts: list) -> DownloaderStatus:
    # actual downloader
    def basic_dl_impl(vid: str, ext: str) -> int:
        url = (linktemplate % (vid, ext))
        return download_file(url, "%s.%s" % (basename, ext))

    for exts in [vexts, iexts]:
        for ext in exts:
            r = basic_dl_impl(video["id"], ext)
            if r == DownloaderStatus.SUCCESS:
                break  # done!
            elif r == DownloaderStatus.ERROR:
                # timeout; try again later?
                return DownloaderStatus.ERROR
            elif r == DownloaderStatus.UNAVAILABLE:
                continue
        else:
            # we did not break out of the loop
            # which means all extensions were unavailable
            return DownloaderStatus.UNAVAILABLE

    # video was downloaded successfully
    return DownloaderStatus.SUCCESS


# GhostArchive, basic...
def ghostarchive_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
    return basic_dl_template(video, basename, output,
        "https://ghostvideo.b-cdn.net/chimurai/%s.%s",
        ["mp4", "webm", "mkv"],
        [] # none
    )


# media.desirintoplaisir.net
#
# holds PRIMARILY popular videos (i.e. no niche internet microcelebrities)
# or weeb shit, however it seems to be growing to other stuff.
#
# there isn't really a proper API; I've based the scraping off of the HTML
# and the public source code.
def desirintoplaisir_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
    return basic_dl_template(video, basename, output,
        "https://media.desirintoplaisir.net/content/%s.%s",
        ["mp4", "webm", "mkv"],
        ["webp"]
    )


# Internet Archive's Wayback Machine
#
# Internally, IA's javascript routines forward to the magic
# URL used here.
#
# TODO: Download thumbnails through the CDX API:
# https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py
# the CDX API is pretty slow though, so it should be used as a last resort.
def wayback_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
    PREFIX = "https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/"
    return download_file(PREFIX + video["id"], basename, True)


# Also captures the ID for comparison
IA_REGEX = re.compile(r"(?:(?P<date>\d{8}) - )?(?P<title>.+?)?(?:-| \[)?(?:(?P<id>[A-z0-9_\-]{11})]?|(?: \((?P<format>(?:(?:(?P<resolution>\d+)p_(?P<fps>\d+)fps_(?P<vcodec>H264)-)?(?P<abitrate>\d+)kbit_(?P<acodec>AAC|Vorbis))|BQ|Description)\)))\.(?P<extension>mp4|info\.json|description|annotations\.xml|webp|mkv|webm|jpg|jpeg|ogg|txt|m4a)$")


# Internet Archive (tubeup)
#
# NOTE: We don't actually need the python library anymore; we already
# explicitly download the file listing using our own logic, so there's
# really nothing stopping us from going ahead and downloading everything
# else using the download_file function.
def ia_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
    def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool:
        # FIXME:
        #
        # There are some items on IA that combine the old tubeup behavior
        # (i.e., including the sanitized video name before the ID)
        # and the new tubeup behavior (filename only contains the video ID)
        # hence we will download the entire video twice.
        #
        # This isn't much of a problem anymore (and hasn't been for like 3
        # years), since I contributed code to not upload something if there
        # is already something there. However we should handle this case
        # anyway.
        #
        # Additionally, there are some items that have duplicate video files
        # (from when the owners changed the title). We should ideally only
        # download unique files. IA seems to provide SHA1 hashes...
        #
        # We should also check if whether the copy on IA is higher quality
        # than a local copy... :)

        IA_ID = "youtube-%s" % vidid

        # Ignore IA generated thumbnails
        if f.startswith("%s.thumbs/" % IA_ID) or f == "__ia_thumb.jpg":
            return False

        for i in ["_archive.torrent", "_files.xml", "_meta.sqlite", "_meta.xml"]:
            if f == (IA_ID + i):
                return False

        # Try to match with our known filename regex
        # This properly matches:
        #   ??????????? - YYYYMMDD - TITLE [ID].EXTENSION
        #   old tubeup  - TITLE-ID.EXTENSION
        #   tubeup      - ID.EXTENSION
        #   JDownloader - TITLE (FORMAT).EXTENSION
        # (Possibly we should match other filenames too??)
        m = re.match(IA_REGEX, f)
        if m is None:
            return False

        if m.group("id"):
            return (m.group("id") == vidid)
        elif m.group("title") is not None:
            def asciify(s: str) -> str:
                # Replace all non-ASCII chars with underscores, and get rid of any whitespace
                return ''.join([i if ord(i) >= 0x20 and ord(i) < 0x80 and i not in "/\\" else '_' for i in s]).strip()

            if asciify(m.group("title")) == asciify(vidtitle):
                return True  # Close enough

        # Uh oh
        return False

    def ia_get_original_files(identifier: str) -> typing.Optional[list]:
        def ia_xml(identifier: str) -> typing.Optional[str]:
            for _ in range(1, 9999):
                try:
                    with urllib.request.urlopen("https://archive.org/download/%s/%s_files.xml" % (identifier, identifier)) as req:
                        return req.read().decode("utf-8")
                except HTTPError as e:
                    if e.code == 404 or e.code == 503:
                        return None
                    time.sleep(5)

        d = ia_xml(identifier)
        if d is None:
            return None

        try:
            r = []

            # Now parse the XML and make a list of each original file
            for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d)):
                l = {"name": x.attrib["name"]}

                sz = x.find("size")
                if sz is not None:
                    l["size"] = int(sz.text)

                r.append(l)

            return r

        except Exception as e:
            print(e)
            return None

    IA_IDENTIFIER = "youtube-%s" % video["id"]

    originalfiles = ia_get_original_files(IA_IDENTIFIER)
    if not originalfiles:
        return DownloaderStatus.UNAVAILABLE

    flist = [
        f
        for f in originalfiles
        if ia_file_legit(f["name"], video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"])
    ]

    if not flist:
        return DownloaderStatus.UNAVAILABLE # ??????

    for i in flist:
        for _ in range(1, 10):
            path = "%s/%s" % (IA_IDENTIFIER, i["name"])
            r = download_file("https://archive.org/download/" + urllib.parse.quote(path, encoding="utf-8"), path, False, None if not "size" in i else i["size"])
            if r == DownloaderStatus.SUCCESS:
                break
            elif r == DownloaderStatus.ERROR:
                # sleep for a bit and retry
                time.sleep(1.0)
                continue
            elif r == DownloaderStatus.UNAVAILABLE:
                return DownloaderStatus.UNAVAILABLE

    # Newer versions of tubeup save only the video ID.
    # Account for this by replacing it.
    #
    # paper/2025-08-30: fixed a bug where video IDs with hyphens
    # would incorrectly truncate
    #
    # paper/2026-02-27: an update in the IA python library changed
    # the way destdir works, so it just gets entirely ignored.
    for f in flist:
        def getext(s: str, vidid: str) -> typing.Optional[str]:
            # special cases
            for i in [".info.json", ".annotations.xml"]:
                if s.endswith(i):
                    return i

            # Handle JDownloader "TITLE (Description).txt"
            if s.endswith(" (Description).txt"):
                return ".description"

            # Catch-all for remaining extensions
            spli = os.path.splitext(s)
            if spli is None or len(spli) != 2:
                return None

            return spli[1]

        ondisk = "youtube-%s/%s" % (video["id"], f["name"])

        if not os.path.exists(ondisk):
            continue

        ext = getext(f["name"], video["id"])
        if ext is None:
            continue

        os.replace(ondisk, "%s%s" % (basename, ext))

    shutil.rmtree("youtube-%s" % video["id"])

    return DownloaderStatus.SUCCESS


def ytdlp_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
    # intentionally ignores all messages besides errors
    class MyLogger(object):
        def debug(self, msg):
            pass

        def warning(self, msg):
            pass

        def error(self, msg):
            print(" " + msg)
            pass


    def ytdl_hook(d) -> None:
        if d["status"] == "finished":
            print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
        if d["status"] == "downloading":
            print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
                                             d["_percent_str"]), end="")
        if d["status"] == "error":
            print("\n an error occurred downloading %s!"
                  % (os.path.basename(d["filename"])))

    ytdl_opts = {
        "retries": 100,
        "nooverwrites": True,
        "call_home": False,
        "quiet": True,
        "writeinfojson": True,
        "writedescription": True,
        "writethumbnail": True,
        "writeannotations": True,
        "writesubtitles": True,
        "allsubtitles": True,
        "addmetadata": True,
        "continuedl": True,
        "embedthumbnail": True,
        "format": "bestvideo+bestaudio/best",
        "restrictfilenames": True,
        "no_warnings": True,
        "progress_hooks": [ytdl_hook],
        "logger": MyLogger(),
        "ignoreerrors": False,
        # yummy
        "outtmpl": output + "/%(title)s-%(id)s.%(ext)s",
    }

    with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
        try:
            ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"])
            return DownloaderStatus.SUCCESS
        except DownloadError:
            return DownloaderStatus.UNAVAILABLE
        except Exception as e:
            print(" unknown error downloading video!\n")
            print(e)

    return DownloaderStatus.ERROR


# TODO: There are multiple other youtube archival websites available.
# Most notable is https://findyoutubevideo.thetechrobo.ca .
# This combines a lot of sparse youtube archival services, and has
# a convenient API we can use. Nice!
#
# There is also the "Distributed YouTube Archive" which is totally
# useless because there's way to automate it...

##############################################################################


def main():
    def load_split_files(path: str):
        def cruft(isdir: bool, listdir, openf):
            # build the path list
            if not isdir:
                list_files = [path]
            else:
                list_files = filter(lambda x: re.search(r"vids[0-9\-]+?\.json", x), listdir())

            # now open each as a json
            for fi in list_files:
                print(fi)
                with openf(fi, "r") as infile:
                    if simdjson:
                        # Using this is a lot faster in SIMDJSON, since instead
                        # of converting all of the JSON key/value pairs into
                        # native Python objects, they stay in an internal state.
                        #
                        # This means we only get the stuff we absolutely need,
                        # which is the uploader ID, and copy everything else
                        # if the ID is one we are looking for.
                        parser = json.Parser()
                        yield parser.parse(infile.read())
                        del parser
                    else:
                        yield json.load(infile)


        try:
            if not zipfile_works or os.path.isdir(path):
                raise Exception

            with zipfile.ZipFile(path, "r") as myzip:
                yield from cruft(True, lambda: myzip.namelist(), lambda f, m: io.TextIOWrapper(myzip.open(f, mode=m), encoding="utf-8"))
        except Exception as e:
            yield from cruft(os.path.isdir(path), lambda: os.listdir(path), lambda f, m: open(path + "/" + f, m, encoding="utf-8"))


    def write_metadata(i: dict, basename: str) -> None:
        # ehhh
        if not os.path.exists(basename + ".info.json"):
            with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
                try:
                    # orjson outputs bytes
                    jsonfile.write(json.dumps(i).decode("utf-8"))
                except AttributeError:
                    # everything else outputs a string
                    jsonfile.write(json.dumps(i))
                print(" saved %s" % os.path.basename(jsonfile.name))
        if not os.path.exists(basename + ".description"):
            with open(basename + ".description", "w",
                      encoding="utf-8") as descfile:
                descfile.write(i["description"])
                print(" saved %s" % os.path.basename(descfile.name))

    args = docopt.docopt(__doc__)

    if not os.path.exists(args["--output"]):
        os.mkdir(args["--output"])

    channels = dict()

    for url in args["<url>"]:
        chn = url.split("/")[-1]
        channels[chn] = {"output": "%s/%s" % (args["--output"], chn)}

    for channel in channels.values():
        if not os.path.exists(channel["output"]):
            os.mkdir(channel["output"])

    # find videos in the database.
    #
    # despite how it may seem, this is actually really fast, and fairly
    # memory efficient too (but really only if we're using simdjson...)
    videos = [
        i if not simdjson else i.as_dict()
        for f in load_split_files(args["--database"])
        for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird
        if "uploader_id" in i and i["uploader_id"] in channels
    ]

    while True:
        if len(videos) == 0:
            break

        videos_copy = videos

        for i in videos_copy:
            channel = channels[i["uploader_id"]]

            # precalculated for speed
            output = channel["output"]

            print("%s:" % i["id"])
            basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
                                     restricted=True), i["id"])
            def filenotworthit(f) -> bool:
                try:
                    return bool(os.path.getsize(f))
                except:
                    return False

            pathoutput = Path(output)

            # This is terrible
            files = list(filter(filenotworthit, [y
                     for p in ["mkv", "mp4", "webm"]
                     for y in pathoutput.glob(("*-%s." + p) % i["id"])]))
            if files:
                print(" video already downloaded!")
                videos.remove(i)
                write_metadata(i, basename)
                continue

            # high level "download" function.
            def dl(video: dict, basename: str, output: str):
                dls = []

                if ytdlp_works:
                    dls.append({
                        "func": ytdlp_dl,
                        "name": "using yt-dlp",
                    })

                dls.append({
                    "func": ia_dl,
                    "name": "from the Internet Archive",
                })

                dls.append({
                    "func": desirintoplaisir_dl,
                    "name": "from LMIJLM/DJ Plaisir's archive",
                })
                dls.append({
                    "func": ghostarchive_dl,
                    "name": "from GhostArchive"
                })
                dls.append({
                    "func": wayback_dl,
                    "name": "from the Wayback Machine"
                })

                for dl in dls:
                    print(" attempting to download %s" % dl["name"])
                    r = dl["func"](i, basename, output)
                    if r == DownloaderStatus.SUCCESS:
                        # all good, video's downloaded
                        return DownloaderStatus.SUCCESS
                    elif r == DownloaderStatus.UNAVAILABLE:
                        # video is unavailable here
                        print(" oops, video is not available there...")
                        continue
                    elif r == DownloaderStatus.ERROR:
                        # error while downloading; likely temporary.
                        # TODO we should save which downloader the video
                        # was on, so we can continue back at it later.
                        return DownloaderStatus.ERROR

                return DownloaderStatus.UNAVAILABLE

            r = dl(i, basename, output)
            if r == DownloaderStatus.ERROR:
                continue

            # video is downloaded, or it's totally unavailable, so
            # remove it from being checked again.
            videos.remove(i)
            # ... and then dump the metadata, if there isn't any on disk.
            write_metadata(i, basename)

            if r == DownloaderStatus.SUCCESS:
                # video is downloaded
                continue

            # video is unavailable; write out the metadata.
            print(" video is unavailable everywhere; dumping out metadata only")


if __name__ == "__main__":
    main()