Mercurial > channeldownloader
view channeldownloader.py @ 16:088d9a3a2524
improvements to IA downloader
now we explicitly ignore any file not "original". this seems to
filter out derivative files (such as ogv and other shit we don't
want) but keeps some of the toplevel metadata
| author | Paper <paper@tflc.us> |
|---|---|
| date | Sat, 28 Feb 2026 14:38:04 -0500 |
| parents | 615e1ca0212a |
| children | 0d10b2ce0140 |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # channeldownloader.py - scrapes youtube videos from a channel from # a variety of sources # Copyright (c) 2021-2025 Paper <paper@tflc.us> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Usage: channeldownloader.py <url>... (--database <file>) [--output <folder>] channeldownloader.py -h | --help Arguments: <url> YouTube channel URL to download from Options: -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] -d --database <file> yt-dlp style database of videos. Should contain an array of yt-dlp .info.json data. For example, FinnOtaku's YTPMV metadata archive. """ # Built-in python stuff (no possible missing dependencies) from __future__ import print_function import docopt import os import re import time import urllib.request import os import ssl import io import shutil import xml.etree.ElementTree as XmlET from urllib.error import HTTPError from pathlib import Path # We can utilize special simdjson features if it is available simdjson = False try: import simdjson as json simdjson = True print("INFO: using simdjson") except ImportError: try: import ujson as json print("INFO: using ujson") except ImportError: try: import orjson as json print("INFO: using orjson") except ImportError: import json print("INFO: using built-in json (slow!)") ytdlp_works = False try: import yt_dlp as youtube_dl from yt_dlp.utils import sanitize_filename, DownloadError ytdlp_works = True except ImportError: print("failed to import yt-dlp!") print("downloading from YouTube directly will not work.") ia_works = False try: import internetarchive from requests.exceptions import ConnectTimeout ia_works = True except ImportError: print("failed to import the Internet Archive's python library!") print("downloading from IA will not work.") zipfile_works = False try: import zipfile zipfile_works = True except ImportError: print("failed to import zipfile!") print("loading the database from a .zip file will not work.") ############################################################################## ## DOWNLOADERS # All downloaders should be a function under this signature: # dl(video: dict, basename: str, output: str) -> int # where: # 'video': the .info.json scraped from the YTPMV metadata archive. # 'basename': the basename output to write as. # 'output': the output directory. # yes, it's weird, but I don't care ;) # # Magic return values: # 0 -- all good, video is downloaded # 1 -- error downloading video; it may still be available if we try again # 2 -- video is proved totally unavailable here. give up # Basic downloader template. # # This does a brute-force of all extensions within vexts and iexts # in an attempt to find a working video link. # # linktemplate is a template to be created using the video ID and # extension. For example: # https://cdn.ytarchiver.com/%s.%s def basic_dl_template(video: dict, basename: str, output: str, linktemplate: str, vexts: list, iexts: list) -> int: # actual downloader def basic_dl_impl(vid: str, ext: str) -> int: url = (linktemplate % (vid, ext)) try: with urllib.request.urlopen(url) as headers: with open("%s.%s" % (basename, ext), "wb") as f: f.write(headers.read()) print(" downloaded %s.%s" % (basename, ext)) return 0 except TimeoutError: return 1 except HTTPError: return 2 except Exception as e: print(" unknown error downloading video!") print(e) return 1 for exts in [vexts, iexts]: for ext in exts: r = basic_dl_impl(video["id"], ext) if r == 0: break # done! elif r == 1: # timeout; try again later? return 1 elif r == 2: continue else: # we did not break out of the loop # which means all extensions were unavailable return 2 # video was downloaded successfully return 0 # GhostArchive, basic... def ghostarchive_dl(video: dict, basename: str, output: str) -> int: return basic_dl_template(video, basename, output, "https://ghostvideo.b-cdn.net/chimurai/%s.%s", ["mp4", "webm", "mkv"], [] # none ) # media.desirintoplaisir.net # # holds PRIMARILY popular videos (i.e. no niche internet microcelebrities) # or weeb shit, however it seems to be growing to other stuff. # # there isn't really a proper API; I've based the scraping off of the HTML # and the public source code. def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int: return basic_dl_template(video, basename, output, "https://media.desirintoplaisir.net/content/%s.%s", ["mp4", "webm", "mkv"], ["webp"] ) # Internet Archive's Wayback Machine # # Internally, IA's javascript routines forward to the magic # URL used here. # # TODO: Download thumbnails through the CDX API: # https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py # the CDX API is pretty slow though, so it should be used as a last resort. def wayback_dl(video: dict, basename: str, output: str) -> int: try: url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv" "e.org/yt/%s" % video["id"]) with urllib.request.urlopen(url) as headers: contenttype = headers.getheader("Content-Type") if contenttype == "video/webm" or contenttype == "video/mp4": ext = contenttype.split("/")[-1] else: raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None) with open("%s.%s" % (basename, ext), "wb") as f: f.write(headers.read()) print(" downloaded %s.%s" % (basename, ext)) return 0 except TimeoutError: return 1 except HTTPError: # dont keep trying return 2 except Exception as e: print(" unknown error downloading video!") print(e) return 1 # Also captures the ID for comparison IA_REGEX = re.compile(r"(?:(?P<date>\d{8}) - )?(?P<title>.+?)?(?:-| \[)?(?:(?P<id>[A-z0-9_\-]{11})]?|(?: \((?P<format>(?:(?:(?P<resolution>\d+)p_(?P<fps>\d+)fps_(?P<vcodec>H264)-)?(?P<abitrate>\d+)kbit_(?P<acodec>AAC|Vorbis))|BQ|Description)\)))\.(?P<extension>mp4|info\.json|description|annotations\.xml|webp|mkv|webm|jpg|jpeg|ogg|txt|m4a)$") # Internet Archive (tubeup) def ia_dl(video: dict, basename: str, output: str) -> int: def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool: # FIXME: # # There are some items on IA that combine the old tubeup behavior # (i.e., including the sanitized video name before the ID) # and the new tubeup behavior (filename only contains the video ID) # hence we will download the entire video twice. # # This isn't much of a problem anymore (and hasn't been for like 3 # years), since I contributed code to not upload something if there # is already something there. However we should handle this case # anyway. # # Additionally, there are some items that have duplicate video files # (from when the owners changed the title). We should ideally only # download unique files. IA seems to provide SHA1 hashes... # # We should also check if whether the copy on IA is higher quality # than a local copy... :) IA_ID = "youtube-%s" % vidid # Ignore IA generated thumbnails if f.startswith("%s.thumbs/" % IA_ID) or f == "__ia_thumb.jpg": return False for i in ["_archive.torrent", "_files.xml", "_meta.sqlite", "_meta.xml"]: if f == (IA_ID + i): return False # Try to match with our known filename regex # This properly matches: # ??????????? - YYYYMMDD - TITLE [ID].EXTENSION # old tubeup - TITLE-ID.EXTENSION # tubeup - ID.EXTENSION # JDownloader - TITLE (FORMAT).EXTENSION # (Possibly we should match other filenames too??) m = re.match(IA_REGEX, f) if m is None: return False if m.group("id"): return (m.group("id") == vidid) elif m.group("title") is not None: def asciify(s: str) -> str: # Replace all non-ASCII chars with underscores, and get rid of any whitespace return ''.join([i if ord(i) >= 0x20 and ord(i) < 0x80 and i not in "/\\" else '_' for i in s]).strip() if asciify(m.group("title")) == asciify(vidtitle): return True # Close enough # Uh oh return False def ia_get_original_files(identifier: str) -> typing.Optional[list]: def ia_xml(identifier: str) -> typing.Optional[str]: for _ in range(1, 9999): try: with urllib.request.urlopen("https://archive.org/download/%s/%s_files.xml" % (identifier, identifier)) as req: return req.read().decode("utf-8") except HTTPError as e: if e.code == 404 or e.code == 503: return None time.sleep(5) d = ia_xml(identifier) if d is None: return None try: # Now parse the XML and make a list of each original file return [x.attrib["name"] for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d))] except Exception as e: print(e) return None originalfiles = ia_get_original_files("youtube-%s" % video["id"]) if not originalfiles: return 2 flist = [ f for f in originalfiles if ia_file_legit(f, video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"]) ] if not flist: return 2 # ?????? while True: try: internetarchive.download("youtube-%s" % video["id"], files=flist, verbose=True, ignore_existing=True, retries=9999) break except ConnectTimeout: time.sleep(1) continue except Exception as e: print(e) return 1 # Newer versions of tubeup save only the video ID. # Account for this by replacing it. # # paper/2025-08-30: fixed a bug where video IDs with hyphens # would incorrectly truncate # # paper/2026-02-27: an update in the IA python library changed # the way destdir works, so it just gets entirely ignored. for fname in flist: def getext(s: str, vidid: str) -> typing.Optional[str]: # special cases for i in [".info.json", ".annotations.xml"]: if s.endswith(i): return i # Handle JDownloader "TITLE (Description).txt" if s.endswith(" (Description).txt"): return ".description" # Catch-all for remaining extensions spli = os.path.splitext(s) if spli is None or len(spli) != 2: return None return spli[1] ondisk = "youtube-%s/%s" % (video["id"], fname) if not os.path.exists(ondisk): continue ext = getext(fname, video["id"]) if ext is None: continue os.replace(ondisk, "%s%s" % (basename, ext)) shutil.rmtree("youtube-%s" % video["id"]) return 0 def ytdlp_dl(video: dict, basename: str, output: str) -> int: # intentionally ignores all messages besides errors class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(" " + msg) pass def ytdl_hook(d) -> None: if d["status"] == "finished": print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"]))) ytdl_opts = { "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, #mm, output template "outtmpl": output + "/%(title)s-%(id)s.%(ext)s", } with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"]) return 0 except DownloadError: return 2 except Exception as e: print(" unknown error downloading video!\n") print(e) return 1 # TODO: There are multiple other youtube archival websites available. # Most notable is https://findyoutubevideo.thetechrobo.ca . # This combines a lot of sparse youtube archival services, and has # a convenient API we can use. Nice! # # There is also the "Distributed YouTube Archive" which is totally # useless because there's way to automate it... ############################################################################## def main(): def load_split_files(path: str): def cruft(isdir: bool, listdir, openf): # build the path list if not isdir: list_files = [path] else: list_files = filter(lambda x: re.search(r"vids[0-9\-]+?\.json", x), listdir()) # now open each as a json for fi in list_files: print(fi) with openf(fi, "r") as infile: if simdjson: # Using this is a lot faster in SIMDJSON, since instead # of converting all of the JSON key/value pairs into # native Python objects, they stay in an internal state. # # This means we only get the stuff we absolutely need, # which is the uploader ID, and copy everything else # if the ID is one we are looking for. parser = json.Parser() yield parser.parse(infile.read()) del parser else: yield json.load(infile) try: if not zipfile_works or os.path.isdir(path): raise Exception with zipfile.ZipFile(path, "r") as myzip: yield from cruft(True, lambda: myzip.namelist(), lambda f, m: io.TextIOWrapper(myzip.open(f, mode=m), encoding="utf-8")) except Exception as e: yield from cruft(os.path.isdir(path), lambda: os.listdir(path), lambda f, m: open(path + "/" + f, m, encoding="utf-8")) def write_metadata(i: dict, basename: str) -> None: # ehhh if not os.path.exists(basename + ".info.json"): with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: try: # orjson outputs bytes jsonfile.write(json.dumps(i).decode("utf-8")) except AttributeError: # everything else outputs a string jsonfile.write(json.dumps(i)) print(" saved %s" % os.path.basename(jsonfile.name)) if not os.path.exists(basename + ".description"): with open(basename + ".description", "w", encoding="utf-8") as descfile: descfile.write(i["description"]) print(" saved %s" % os.path.basename(descfile.name)) args = docopt.docopt(__doc__) if not os.path.exists(args["--output"]): os.mkdir(args["--output"]) channels = dict() for url in args["<url>"]: chn = url.split("/")[-1] channels[chn] = {"output": "%s/%s" % (args["--output"], chn)} for channel in channels.values(): if not os.path.exists(channel["output"]): os.mkdir(channel["output"]) # find videos in the database. # # despite how it may seem, this is actually really fast, and fairly # memory efficient too (but really only if we're using simdjson...) videos = [ i if not simdjson else i.as_dict() for f in load_split_files(args["--database"]) for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird if "uploader_id" in i and i["uploader_id"] in channels ] while True: if len(videos) == 0: break videos_copy = videos for i in videos_copy: channel = channels[i["uploader_id"]] # precalculated for speed output = channel["output"] print("%s:" % i["id"]) basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"]) def filenotworthit(f) -> bool: try: return bool(os.path.getsize(f)) except: return False pathoutput = Path(output) # This is terrible files = list(filter(filenotworthit, [y for p in ["mkv", "mp4", "webm"] for y in pathoutput.glob(("*-%s." + p) % i["id"])])) if files: print(" video already downloaded!") videos.remove(i) write_metadata(i, basename) continue # high level "download" function. def dl(video: dict, basename: str, output: str): dls = [] if ytdlp_works: dls.append({ "func": ytdlp_dl, "name": "using yt-dlp", }) if ia_works: dls.append({ "func": ia_dl, "name": "from the Internet Archive", }) dls.append({ "func": desirintoplaisir_dl, "name": "from LMIJLM/DJ Plaisir's archive", }) dls.append({ "func": ghostarchive_dl, "name": "from GhostArchive" }) dls.append({ "func": wayback_dl, "name": "from the Wayback Machine" }) for dl in dls: print(" attempting to download %s" % dl["name"]) r = dl["func"](i, basename, output) if r == 0: # all good, video's downloaded return 0 elif r == 2: # video is unavailable here print(" oops, video is not available there...") continue elif r == 1: # error while downloading; likely temporary. # TODO we should save which downloader the video # was on, so we can continue back at it later. return 1 # video is unavailable everywhere return 2 r = dl(i, basename, output) if r == 1: continue # video is downloaded, or it's totally unavailable, so # remove it from being checked again. videos.remove(i) # ... and then dump the metadata, if there isn't any on disk. write_metadata(i, basename) if r == 0: # video is downloaded continue # video is unavailable; write out the metadata. print(" video is unavailable everywhere; dumping out metadata only") if __name__ == "__main__": main()
