Mercurial > channeldownloader
diff channeldownloader.py @ 14:03c8fd4069fb default tip
*: big refactor, switch to GPLv2, and add README
Okay: now, we use a modular approach for downloaders. Each downloader
is provided through a single function (which does the fetching).
Additionally, the internetarchive library is optional now if the user
does not want to install it.
yt-dlp is still necessary though for it's sanitize_filename function.
If and when I get to adding vanity features (such as finding the best
possible source by comparing resolution and bitrate), I'll probably
separate out all of the downloaders into different files.
I also moved this project to a separate repository from 'codedump',
keeping all of the relevant commit history :)
| author | Paper <paper@tflc.us> |
|---|---|
| date | Sat, 30 Aug 2025 17:09:56 -0400 |
| parents | 2e7a3725ad21 |
| children |
line wrap: on
line diff
--- a/channeldownloader.py Fri Apr 14 23:53:37 2023 -0400 +++ b/channeldownloader.py Sat Aug 30 17:09:56 2025 -0400 @@ -1,4 +1,22 @@ #!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# channeldownloader.py - scrapes youtube videos from a channel from +# a variety of sources + +# Copyright (c) 2021-2025 Paper <paper@tflc.us> +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + """ Usage: channeldownloader.py <url>... (--database <file>) @@ -12,218 +30,489 @@ -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] - -d --database <file> YTPMV_Database compatible JSON file + -d --database <file> yt-dlp style database of videos. Should contain + an array of yt-dlp .info.json data. For example, + FinnOtaku's YTPMV metadata archive. """ + +# Built-in python stuff (no possible missing dependencies) from __future__ import print_function import docopt -import internetarchive -try: - import orjson as json -except ImportError: - import json import os import re import time import urllib.request -import requests # need this for ONE (1) exception -import yt_dlp as youtube_dl +import os +import ssl from urllib.error import HTTPError -from yt_dlp.utils import sanitize_filename, DownloadError from pathlib import Path -from requests.exceptions import ConnectTimeout + +# We can utilize special simdjson features if it is available +simdjson = False +try: + import simdjson as json + simdjson = True + print("INFO: using simdjson") +except ImportError: + try: + import ujson as json + print("INFO: using ujson") + except ImportError: + try: + import orjson as json + print("INFO: using orjson") + except ImportError: + import json + print("INFO: using built-in json (slow!)") -class MyLogger(object): - def debug(self, msg): - pass - - def warning(self, msg): - pass +ytdlp_works = False - def error(self, msg): - print(" " + msg) - pass +try: + import yt_dlp as youtube_dl + from yt_dlp.utils import sanitize_filename, DownloadError + ytdlp_works = True +except ImportError: + print("failed to import yt-dlp!") + print("downloading from YouTube directly will not work.") +ia_works = False -def ytdl_hook(d) -> None: - if d["status"] == "finished": - print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) - if d["status"] == "downloading": - print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), - d["_percent_str"]), end="") - if d["status"] == "error": - print("\n an error occurred downloading %s!" - % (os.path.basename(d["filename"]))) +try: + import internetarchive + from requests.exceptions import ConnectTimeout + ia_works = True +except ImportError: + print("failed to import the Internet Archive's python library!") + print("downloading from IA will not work.") + +############################################################################## +## DOWNLOADERS + +# All downloaders should be a function under this signature: +# dl(video: dict, basename: str, output: str) -> int +# where: +# 'video': the .info.json scraped from the YTPMV metadata archive. +# 'basename': the basename output to write as. +# 'output': the output directory. +# yes, it's weird, but I don't care ;) +# +# Magic return values: +# 0 -- all good, video is downloaded +# 1 -- error downloading video; it may still be available if we try again +# 2 -- video is proved totally unavailable here. give up -def load_split_files(path: str): - if not os.path.isdir(path): - yield json.load(open(path, "r", encoding="utf-8")) - for fi in os.listdir(path): - if re.search(r"vids[0-9\-]+?\.json", fi): - with open(path + "/" + fi, "r", encoding="utf-8") as infile: - print(fi) - yield json.load(infile) +# Basic downloader template. +# +# This does a brute-force of all extensions within vexts and iexts +# in an attempt to find a working video link. +# +# linktemplate is a template to be created using the video ID and +# extension. For example: +# https://cdn.ytarchiver.com/%s.%s +def basic_dl_template(video: dict, basename: str, output: str, + linktemplate: str, vexts: list, iexts: list) -> int: + # actual downloader + def basic_dl_impl(vid: str, ext: str) -> int: + url = (linktemplate % (vid, ext)) + try: + with urllib.request.urlopen(url) as headers: + with open("%s.%s" % (basename, ext), "wb") as f: + f.write(headers.read()) + print(" downloaded %s.%s" % (basename, ext)) + return 0 + except TimeoutError: + return 1 + except HTTPError: + return 2 + except Exception as e: + print(" unknown error downloading video!") + print(e) + return 1 + for exts in [vexts, iexts]: + for ext in exts: + r = basic_dl_impl(video["id"], ext) + if r == 0: + break # done! + elif r == 1: + # timeout; try again later? + return 1 + elif r == 2: + continue + else: + # we did not break out of the loop + # which means all extensions were unavailable + return 2 -def reporthook(count: int, block_size: int, total_size: int) -> None: - global start_time - if count == 0: - start_time = time.time() - return - percent = int(count * block_size * 100 / total_size) - print(" downloading %d%% \r" % (percent), end="") + # video was downloaded successfully + return 0 -def write_metadata(i: dict, basename: str) -> None: - if not os.path.exists(basename + ".info.json"): - with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: - try: - jsonfile.write(json.dumps(i).decode("utf-8")) - except AttributeError: - jsonfile.write(json.dumps(i)) - print(" saved %s" % os.path.basename(jsonfile.name)) - if not os.path.exists(basename + ".description"): - with open(basename + ".description", "w", - encoding="utf-8") as descfile: - descfile.write(i["description"]) - print(" saved %s" % os.path.basename(descfile.name)) +# GhostArchive, basic... +def ghostarchive_dl(video: dict, basename: str, output: str) -> int: + return basic_dl_template(video, basename, output, + "https://ghostvideo.b-cdn.net/chimurai/%s.%s", + ["mp4", "webm", "mkv"], + [] # none + ) -def wayback_machine_dl(video: dict, basename: str) -> int: +# media.desirintoplaisir.net +# +# holds PRIMARILY popular videos (i.e. no niche internet microcelebrities) +# or weeb shit, however it seems to be growing to other stuff. +# +# there isn't really a proper API; I've based the scraping off of the HTML +# and the public source code. +def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int: + return basic_dl_template(video, basename, output, + "https://media.desirintoplaisir.net/content/%s.%s", + ["mp4", "webm", "mkv"], + ["webp"] + ) + + +# Internet Archive's Wayback Machine +# +# Internally, IA's javascript routines forward to the magic +# URL used here. +# +# TODO: Download thumbnails through the CDX API: +# https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py +# the CDX API is pretty slow though, so it should be used as a last resort. +def wayback_dl(video: dict, basename: str, output: str) -> int: try: - url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu", - "rl.archive.org/yt/%s"]) - headers = urllib.request.urlopen(url % video["id"]) - contenttype = headers.getheader("Content-Type") - if contenttype == "video/webm": - ext = "webm" - elif contenttype == "video/mp4": - ext = "mp4" - else: - raise HTTPError(url=None, code=None, msg=None, - hdrs=None, fp=None) - urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext), - reporthook) + url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv" + "e.org/yt/%s" % video["id"]) + with urllib.request.urlopen(url) as headers: + contenttype = headers.getheader("Content-Type") + if contenttype == "video/webm" or contenttype == "video/mp4": + ext = contenttype.split("/")[-1] + else: + raise HTTPError(url=None, code=None, msg=None, + hdrs=None, fp=None) + with open("%s.%s" % (basename, ext), "wb") as f: + f.write(headers.read()) print(" downloaded %s.%s" % (basename, ext)) return 0 except TimeoutError: return 1 except HTTPError: - print(" video not available on the Wayback Machine!") - return 0 + # dont keep trying + return 2 except Exception as e: - print(" unknown error downloading video!\n") + print(" unknown error downloading video!") print(e) - return 0 - - -def ia_file_legit(path: str, vidid: str) -> bool: - return True if re.search(''.join([r"((?:.+?-)?", vidid, r"\.(?:mp4|jpg|web" - r"p|mkv|webm|info\\.json|description|annotations.xml" - "))"]), - path) else False + return 1 -def internet_archive_dl(video: dict, basename: str, output: str) -> int: - if internetarchive.get_item("youtube-%s" % video["id"]).exists: - flist = [f.name for f in internetarchive.get_files("youtube-%s" % video["id"]) if ia_file_legit(f.name, video["id"])] - while True: - try: - internetarchive.download("youtube-%s" % video["id"], - files=flist, verbose=True, - destdir=output, - no_directory=True, - ignore_existing=True, - retries=9999) - break - except ConnectTimeout: - continue - except Exception as e: - print(e) - return 0 - if flist[0][:len(video["id"])] == video["id"]: - for fname in flist: - if os.path.exists("%s/%s" % (output, fname)): - os.replace("%s/%s" % (output, fname), - "%s-%s" % (basename.rsplit("-", 1)[0], - fname)) - return 1 +# Internet Archive (tubeup) +def ia_dl(video: dict, basename: str, output: str) -> int: + def ia_file_legit(file: internetarchive.File, vidid: str) -> bool: + # FIXME: + # + # There are some items on IA that combine the old tubeup behavior + # (i.e., including the sanitized video name before the ID) + # and the new tubeup behavior (filename only contains the video ID) + # hence we will download the entire video twice. + # + # This isn't much of a problem anymore (and hasn't been for like 3 + # years), since I contributed code to not upload something if there + # is already something there. However we should handle this case + # anyway. + # + # Additionally, there are some items that have duplicate video files + # (from when the owners changed the title). We should ideally only + # download unique files. IA seems to provide SHA1 hashes... + # + # We should also check if whether the copy on IA is higher quality + # than a local copy... :) + if not re.search(r"((?:.+?-)?" + vidid + r"\.(?:mp4|jpg|webp|mkv|w" + r"ebm|info\.json|description|annotations.xml))", + f.name): + return False + + # now, check the metadata + print(f) + return True + + + if not internetarchive.get_item("youtube-%s" % video["id"]).exists: + return 2 + + flist = [ + f.name + for f in internetarchive.get_files("youtube-%s" % video["id"]) + if ia_file_legit(f.name, video["id"]) + ] + while True: + try: + internetarchive.download("youtube-%s" % video["id"], files=flist, + verbose=True, destdir=output, + no_directory=True, ignore_existing=True, + retries=9999) + break + except ConnectTimeout: + time.sleep(1) + continue + except Exception as e: + print(e) + return 1 + + # Newer versions of tubeup save only the video ID. + # Account for this by replacing it. + # + # paper/2025-08-30: fixed a bug where video IDs with hyphens + # would incorrectly truncate + for fname in flist: + # ignore any files whose names are not simply the ID + if os.path.splitext(fname)[0] != video["id"]: + continue + + if os.path.exists("%s/%s" % (output, fname)): + os.replace("%s/%s" % (output, fname), + "%s.%s" % (basename, os.path.splitext(fname))[1]) return 0 -ytdl_opts = { - "retries": 100, - "nooverwrites": True, - "call_home": False, - "quiet": True, - "writeinfojson": True, - "writedescription": True, - "writethumbnail": True, - "writeannotations": True, - "writesubtitles": True, - "allsubtitles": True, - "addmetadata": True, - "continuedl": True, - "embedthumbnail": True, - "format": "bestvideo+bestaudio/best", - "restrictfilenames": True, - "no_warnings": True, - "progress_hooks": [ytdl_hook], - "logger": MyLogger(), - "ignoreerrors": False, -} +def ytdlp_dl(video: dict, basename: str, output: str) -> int: + # intentionally ignores all messages besides errors + class MyLogger(object): + def debug(self, msg): + pass + + def warning(self, msg): + pass + + def error(self, msg): + print(" " + msg) + pass + + + def ytdl_hook(d) -> None: + if d["status"] == "finished": + print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) + if d["status"] == "downloading": + print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), + d["_percent_str"]), end="") + if d["status"] == "error": + print("\n an error occurred downloading %s!" + % (os.path.basename(d["filename"]))) + + ytdl_opts = { + "retries": 100, + "nooverwrites": True, + "call_home": False, + "quiet": True, + "writeinfojson": True, + "writedescription": True, + "writethumbnail": True, + "writeannotations": True, + "writesubtitles": True, + "allsubtitles": True, + "addmetadata": True, + "continuedl": True, + "embedthumbnail": True, + "format": "bestvideo+bestaudio/best", + "restrictfilenames": True, + "no_warnings": True, + "progress_hooks": [ytdl_hook], + "logger": MyLogger(), + "ignoreerrors": False, + + #mm, output template + "outtmpl": output + "/%(title)s-%(id)s.%(ext)s", + } + + with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: + try: + ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"]) + return 0 + except DownloadError: + return 2 + except Exception as e: + print(" unknown error downloading video!\n") + print(e) + + return 1 + + +# TODO: There are multiple other youtube archival websites available. +# Most notable is https://findyoutubevideo.thetechrobo.ca . +# This combines a lot of sparse youtube archival services, and has +# a convenient API we can use. Nice! +# +# There is also the "Distributed YouTube Archive" which is totally +# useless because there's way to automate it... + +############################################################################## def main(): + # generator; creates a list of files, and returns the parsed form of + # each. note that the parser is not necessarily + def load_split_files(path: str): + list_files = [] + + # build the path list + if not os.path.isdir(path): + list_files.append(path) + else: + for fi in os.listdir(path): + if re.search(r"vids[0-9\-]+?\.json", fi): + list_files.append(path + "/" + fi) + + # now open each as a json + for fi in list_files: + print(fi) + with open(fi, "r", encoding="utf-8") as infile: + if simdjson: + # Using this is a lot faster in SIMDJSON, since instead + # of converting all of the JSON key/value pairs into + # native Python objects, they stay in an internal state. + # + # This means we only get the stuff we absolutely need, + # which is the uploader ID, and copy everything else + # if the ID is one we are looking for. + parser = json.Parser() + yield parser.parse(infile.read()) + del parser + else: + yield json.load(infile) + + + def write_metadata(i: dict, basename: str) -> None: + # ehhh + if not os.path.exists(basename + ".info.json"): + with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: + try: + # orjson outputs bytes + jsonfile.write(json.dumps(i).decode("utf-8")) + except AttributeError: + # everything else outputs a string + jsonfile.write(json.dumps(i)) + print(" saved %s" % os.path.basename(jsonfile.name)) + if not os.path.exists(basename + ".description"): + with open(basename + ".description", "w", + encoding="utf-8") as descfile: + descfile.write(i["description"]) + print(" saved %s" % os.path.basename(descfile.name)) + args = docopt.docopt(__doc__) if not os.path.exists(args["--output"]): os.mkdir(args["--output"]) - for f in load_split_files(args["--database"]): - for i in f: - uploader = i["uploader_id"] if "uploader_id" in i else None - for url in args["<url>"]: - channel = url.split("/")[-1] + channels = dict() + + for url in args["<url>"]: + chn = url.split("/")[-1] + channels[chn] = {"output": "%s/%s" % (args["--output"], chn)} + + for channel in channels.values(): + if not os.path.exists(channel["output"]): + os.mkdir(channel["output"]) + + # find videos in the database. + # + # despite how it may seem, this is actually really fast, and fairly + # memory efficient too (but really only if we're using simdjson...) + videos = [ + i if not simdjson else i.as_dict() + for f in load_split_files(args["--database"]) + for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird + if "uploader_id" in i and i["uploader_id"] in channels + ] + + while True: + if len(videos) == 0: + break + + videos_copy = videos + + for i in videos_copy: + channel = channels[i["uploader_id"]] + + # precalculated for speed + output = channel["output"] - output = "%s/%s" % (args["--output"], channel) - if not os.path.exists(output): - os.mkdir(output) - ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s" + print("%s:" % i["id"]) + basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], + restricted=True), i["id"]) + files = [y + for p in ["mkv", "mp4", "webm"] + for y in Path(output).glob(("*-%s." + p) % i["id"])] + if files: + print(" video already downloaded!") + videos.remove(i) + write_metadata(i, basename) + continue + + # high level "download" function. + def dl(video: dict, basename: str, output: str): + dls = [] + + if ytdlp_works: + dls.append({ + "func": ytdlp_dl, + "name": "using yt-dlp", + }) - if uploader == channel: - print(uploader, channel) - print("%s:" % i["id"]) - basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], - restricted=True), i["id"]) - files = [y for p in ["mkv", "mp4", "webm"] for y in list(Path(output).glob(("*-%s." + p) % i["id"]))] - if files: - print(" video already downloaded!") - write_metadata(i, basename) + if ia_works: + dls.append({ + "func": ia_dl, + "name": "from the Internet Archive", + }) + + dls.append({ + "func": desirintoplaisir_dl, + "name": "from LMIJLM/DJ Plaisir's archive", + }) + dls.append({ + "func": ghostarchive_dl, + "name": "from GhostArchive" + }) + dls.append({ + "func": wayback_dl, + "name": "from the Wayback Machine" + }) + + for dl in dls: + print(" attempting to download %s" % dl["name"]) + r = dl["func"](i, basename, output) + if r == 0: + # all good, video's downloaded + return 0 + elif r == 2: + # video is unavailable here + print(" oops, video is not available there...") continue - # this code is *really* ugly... todo a rewrite? - with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: - try: - ytdl.extract_info("https://youtube.com/watch?v=%s" - % i["id"]) - continue - except DownloadError: - print(" video is not available! attempting to find In" - "ternet Archive pages of it...") - except Exception as e: - print(" unknown error downloading video!\n") - print(e) - if internet_archive_dl(i, basename, output): # if we can't download from IA - continue - print(" video does not have a Internet Archive page! attem" - "pting to download from the Wayback Machine...") - while True: - if wayback_machine_dl(i, basename) == 0: # success - break - time.sleep(5) - continue - write_metadata(i, basename) + elif r == 1: + # error while downloading; likely temporary. + # TODO we should save which downloader the video + # was on, so we can continue back at it later. + return 1 + # video is unavailable everywhere + return 2 + + r = dl(i, basename, output) + if r == 1: + continue + + # video is downloaded, or it's totally unavailable, so + # remove it from being checked again. + videos.remove(i) + # ... and then dump the metadata, if there isn't any on disk. + write_metadata(i, basename) + + if r == 0: + # video is downloaded + continue + + # video is unavailable; write out the metadata. + print(" video is unavailable everywhere; dumping out metadata only") if __name__ == "__main__":
