Mercurial > channeldownloader
view channeldownloader.py @ 18:05e71dd6b6ca default tip
no more ia python library
| author | Paper <paper@tflc.us> |
|---|---|
| date | Sat, 28 Feb 2026 22:31:59 -0500 |
| parents | 0d10b2ce0140 |
| children |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # channeldownloader.py - scrapes youtube videos from a channel from # a variety of sources # Copyright (c) 2021-2026 Paper <paper@tflc.us> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # Okay, this is a bit of a clusterfuck. # # This originated as a script that simply helped me scrape a bunch # of videos off some deleted channels (in fact, that's still it's main # purpose) and was very lackluster (hardcoded shite everywhere). # Fortunately in recent times I've cleaned up the code and added some # other mirrors, as well as improved the archive.org scraper to not # shoot itself when it encounters an upload that's not from tubeup. # # Nevertheless, I still consider much of this file to be dirty hacks, # especially some of the HTTP stuff. """ Usage: channeldownloader.py <url>... (--database <file>) [--output <folder>] channeldownloader.py -h | --help Arguments: <url> YouTube channel URL to download from Options: -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] -d --database <file> yt-dlp style database of videos. Should contain an array of yt-dlp .info.json data. For example, FinnOtaku's YTPMV metadata archive. """ # Built-in python stuff (no possible missing dependencies) from __future__ import print_function import docopt import os import re import time import urllib.request import urllib.parse import os import ssl import io import shutil import xml.etree.ElementTree as XmlET import enum from urllib.error import HTTPError from pathlib import Path # We can utilize special simdjson features if it is available simdjson = False try: import simdjson as json simdjson = True print("INFO: using simdjson") except ImportError: try: import ujson as json print("INFO: using ujson") except ImportError: try: import orjson as json print("INFO: using orjson") except ImportError: import json print("INFO: using built-in json (slow!)") ytdlp_works = False try: import yt_dlp as youtube_dl from yt_dlp.utils import sanitize_filename, DownloadError ytdlp_works = True except ImportError: print("failed to import yt-dlp!") print("downloading from YouTube directly will not work.") zipfile_works = False try: import zipfile zipfile_works = True except ImportError: print("failed to import zipfile!") print("loading the database from a .zip file will not work.") ############################################################################## ## DOWNLOADERS # All downloaders should be a function under this signature: # dl(video: dict, basename: str, output: str) -> int # where: # 'video': the .info.json scraped from the YTPMV metadata archive. # 'basename': the basename output to write as. # 'output': the output directory. # yes, it's weird, but I don't care ;) class DownloaderStatus(enum.Enum): # Download finished successfully. SUCCESS = 0 # Download failed. # Note that this should NOT be used for when the video is unavailable # (i.e. error 404); it should only be used when the video cannot be # downloaded *at this time*, indicating a server problem. This is very # common for the Internet Archive, not sure about others. ERROR = 1 # Video is unavailable from this provider. UNAVAILABLE = 2 """ Downloads a file from `url` to `path`, and prints the progress to the screen. """ def download_file(url: str, path: str, guessext: bool = False, length: int = None) -> DownloaderStatus: # Download in 32KiB chunks CHUNK_SIZE = 32768 # Don't exceed 79 chars. try: with urllib.request.urlopen(url) as http: if length is None: # Check whether the URL gives us Content-Length. # If so, call f.truncate to tell the filesystem how much # we will be downloading before we start writing. # # This is also useful for displaying how much we've # downloaded overall as a percent. length = http.getheader("Content-Length", default=None) try: if length is not None: length = int(length) f.truncate(length) except: # fuck it length = None if guessext: # Guess file extension from MIME type mime = http.getheader("Content-Type", default=None) if not mime: return DownloaderStatus.ERROR if mime == "video/mp4": path += ".mp4" elif mime == "video/webm": path += ".webm" else: return DownloaderStatus.ERROR par = os.path.dirname(path) if not os.path.isdir(par): os.makedirs(par) with open(path, "wb") as f: # Download the entire file while True: data = http.read(CHUNK_SIZE) if not data: break f.write(data) print("\r downloading to %s, " % path, end="") if length: print("%.2f%%" % (f.tell() / length * 100.0), end="") else: print("%.2f MiB" % (f.tell() / (1 << 20)), end="") print("\r downloaded to %s " % path) if length is not None and length != f.tell(): # Server lied about what the length was? print(" INFO: HTTP server's Content-Length header lied??") except TimeoutError: return DownloaderStatus.ERROR except HTTPError: return DownloaderStatus.UNAVAILABLE except Exception as e: print(" unknown error downloading video;", e); return DownloaderStatus.ERROR return DownloaderStatus.SUCCESS # Basic downloader template. # # This does a brute-force of all extensions within vexts and iexts # in an attempt to find a working video link. # # linktemplate is a template to be created using the video ID and # extension. For example: # https://cdn.ytarchiver.com/%s.%s def basic_dl_template(video: dict, basename: str, output: str, linktemplate: str, vexts: list, iexts: list) -> DownloaderStatus: # actual downloader def basic_dl_impl(vid: str, ext: str) -> int: url = (linktemplate % (vid, ext)) return download_file(url, "%s.%s" % (basename, ext)) for exts in [vexts, iexts]: for ext in exts: r = basic_dl_impl(video["id"], ext) if r == DownloaderStatus.SUCCESS: break # done! elif r == DownloaderStatus.ERROR: # timeout; try again later? return DownloaderStatus.ERROR elif r == DownloaderStatus.UNAVAILABLE: continue else: # we did not break out of the loop # which means all extensions were unavailable return DownloaderStatus.UNAVAILABLE # video was downloaded successfully return DownloaderStatus.SUCCESS # GhostArchive, basic... def ghostarchive_dl(video: dict, basename: str, output: str) -> DownloaderStatus: return basic_dl_template(video, basename, output, "https://ghostvideo.b-cdn.net/chimurai/%s.%s", ["mp4", "webm", "mkv"], [] # none ) # media.desirintoplaisir.net # # holds PRIMARILY popular videos (i.e. no niche internet microcelebrities) # or weeb shit, however it seems to be growing to other stuff. # # there isn't really a proper API; I've based the scraping off of the HTML # and the public source code. def desirintoplaisir_dl(video: dict, basename: str, output: str) -> DownloaderStatus: return basic_dl_template(video, basename, output, "https://media.desirintoplaisir.net/content/%s.%s", ["mp4", "webm", "mkv"], ["webp"] ) # Internet Archive's Wayback Machine # # Internally, IA's javascript routines forward to the magic # URL used here. # # TODO: Download thumbnails through the CDX API: # https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py # the CDX API is pretty slow though, so it should be used as a last resort. def wayback_dl(video: dict, basename: str, output: str) -> DownloaderStatus: PREFIX = "https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/" return download_file(PREFIX + video["id"], basename, True) # Also captures the ID for comparison IA_REGEX = re.compile(r"(?:(?P<date>\d{8}) - )?(?P<title>.+?)?(?:-| \[)?(?:(?P<id>[A-z0-9_\-]{11})]?|(?: \((?P<format>(?:(?:(?P<resolution>\d+)p_(?P<fps>\d+)fps_(?P<vcodec>H264)-)?(?P<abitrate>\d+)kbit_(?P<acodec>AAC|Vorbis))|BQ|Description)\)))\.(?P<extension>mp4|info\.json|description|annotations\.xml|webp|mkv|webm|jpg|jpeg|ogg|txt|m4a)$") # Internet Archive (tubeup) # # NOTE: We don't actually need the python library anymore; we already # explicitly download the file listing using our own logic, so there's # really nothing stopping us from going ahead and downloading everything # else using the download_file function. def ia_dl(video: dict, basename: str, output: str) -> DownloaderStatus: def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool: # FIXME: # # There are some items on IA that combine the old tubeup behavior # (i.e., including the sanitized video name before the ID) # and the new tubeup behavior (filename only contains the video ID) # hence we will download the entire video twice. # # This isn't much of a problem anymore (and hasn't been for like 3 # years), since I contributed code to not upload something if there # is already something there. However we should handle this case # anyway. # # Additionally, there are some items that have duplicate video files # (from when the owners changed the title). We should ideally only # download unique files. IA seems to provide SHA1 hashes... # # We should also check if whether the copy on IA is higher quality # than a local copy... :) IA_ID = "youtube-%s" % vidid # Ignore IA generated thumbnails if f.startswith("%s.thumbs/" % IA_ID) or f == "__ia_thumb.jpg": return False for i in ["_archive.torrent", "_files.xml", "_meta.sqlite", "_meta.xml"]: if f == (IA_ID + i): return False # Try to match with our known filename regex # This properly matches: # ??????????? - YYYYMMDD - TITLE [ID].EXTENSION # old tubeup - TITLE-ID.EXTENSION # tubeup - ID.EXTENSION # JDownloader - TITLE (FORMAT).EXTENSION # (Possibly we should match other filenames too??) m = re.match(IA_REGEX, f) if m is None: return False if m.group("id"): return (m.group("id") == vidid) elif m.group("title") is not None: def asciify(s: str) -> str: # Replace all non-ASCII chars with underscores, and get rid of any whitespace return ''.join([i if ord(i) >= 0x20 and ord(i) < 0x80 and i not in "/\\" else '_' for i in s]).strip() if asciify(m.group("title")) == asciify(vidtitle): return True # Close enough # Uh oh return False def ia_get_original_files(identifier: str) -> typing.Optional[list]: def ia_xml(identifier: str) -> typing.Optional[str]: for _ in range(1, 9999): try: with urllib.request.urlopen("https://archive.org/download/%s/%s_files.xml" % (identifier, identifier)) as req: return req.read().decode("utf-8") except HTTPError as e: if e.code == 404 or e.code == 503: return None time.sleep(5) d = ia_xml(identifier) if d is None: return None try: r = [] # Now parse the XML and make a list of each original file for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d)): l = {"name": x.attrib["name"]} sz = x.find("size") if sz is not None: l["size"] = int(sz.text) r.append(l) return r except Exception as e: print(e) return None IA_IDENTIFIER = "youtube-%s" % video["id"] originalfiles = ia_get_original_files(IA_IDENTIFIER) if not originalfiles: return DownloaderStatus.UNAVAILABLE flist = [ f for f in originalfiles if ia_file_legit(f["name"], video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"]) ] if not flist: return DownloaderStatus.UNAVAILABLE # ?????? for i in flist: for _ in range(1, 10): path = "%s/%s" % (IA_IDENTIFIER, i["name"]) r = download_file("https://archive.org/download/" + urllib.parse.quote(path, encoding="utf-8"), path, False, None if not "size" in i else i["size"]) if r == DownloaderStatus.SUCCESS: break elif r == DownloaderStatus.ERROR: # sleep for a bit and retry time.sleep(1.0) continue elif r == DownloaderStatus.UNAVAILABLE: return DownloaderStatus.UNAVAILABLE # Newer versions of tubeup save only the video ID. # Account for this by replacing it. # # paper/2025-08-30: fixed a bug where video IDs with hyphens # would incorrectly truncate # # paper/2026-02-27: an update in the IA python library changed # the way destdir works, so it just gets entirely ignored. for f in flist: def getext(s: str, vidid: str) -> typing.Optional[str]: # special cases for i in [".info.json", ".annotations.xml"]: if s.endswith(i): return i # Handle JDownloader "TITLE (Description).txt" if s.endswith(" (Description).txt"): return ".description" # Catch-all for remaining extensions spli = os.path.splitext(s) if spli is None or len(spli) != 2: return None return spli[1] ondisk = "youtube-%s/%s" % (video["id"], f["name"]) if not os.path.exists(ondisk): continue ext = getext(f["name"], video["id"]) if ext is None: continue os.replace(ondisk, "%s%s" % (basename, ext)) shutil.rmtree("youtube-%s" % video["id"]) return DownloaderStatus.SUCCESS def ytdlp_dl(video: dict, basename: str, output: str) -> DownloaderStatus: # intentionally ignores all messages besides errors class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(" " + msg) pass def ytdl_hook(d) -> None: if d["status"] == "finished": print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"]))) ytdl_opts = { "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, # yummy "outtmpl": output + "/%(title)s-%(id)s.%(ext)s", } with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"]) return DownloaderStatus.SUCCESS except DownloadError: return DownloaderStatus.UNAVAILABLE except Exception as e: print(" unknown error downloading video!\n") print(e) return DownloaderStatus.ERROR # TODO: There are multiple other youtube archival websites available. # Most notable is https://findyoutubevideo.thetechrobo.ca . # This combines a lot of sparse youtube archival services, and has # a convenient API we can use. Nice! # # There is also the "Distributed YouTube Archive" which is totally # useless because there's way to automate it... ############################################################################## def main(): def load_split_files(path: str): def cruft(isdir: bool, listdir, openf): # build the path list if not isdir: list_files = [path] else: list_files = filter(lambda x: re.search(r"vids[0-9\-]+?\.json", x), listdir()) # now open each as a json for fi in list_files: print(fi) with openf(fi, "r") as infile: if simdjson: # Using this is a lot faster in SIMDJSON, since instead # of converting all of the JSON key/value pairs into # native Python objects, they stay in an internal state. # # This means we only get the stuff we absolutely need, # which is the uploader ID, and copy everything else # if the ID is one we are looking for. parser = json.Parser() yield parser.parse(infile.read()) del parser else: yield json.load(infile) try: if not zipfile_works or os.path.isdir(path): raise Exception with zipfile.ZipFile(path, "r") as myzip: yield from cruft(True, lambda: myzip.namelist(), lambda f, m: io.TextIOWrapper(myzip.open(f, mode=m), encoding="utf-8")) except Exception as e: yield from cruft(os.path.isdir(path), lambda: os.listdir(path), lambda f, m: open(path + "/" + f, m, encoding="utf-8")) def write_metadata(i: dict, basename: str) -> None: # ehhh if not os.path.exists(basename + ".info.json"): with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: try: # orjson outputs bytes jsonfile.write(json.dumps(i).decode("utf-8")) except AttributeError: # everything else outputs a string jsonfile.write(json.dumps(i)) print(" saved %s" % os.path.basename(jsonfile.name)) if not os.path.exists(basename + ".description"): with open(basename + ".description", "w", encoding="utf-8") as descfile: descfile.write(i["description"]) print(" saved %s" % os.path.basename(descfile.name)) args = docopt.docopt(__doc__) if not os.path.exists(args["--output"]): os.mkdir(args["--output"]) channels = dict() for url in args["<url>"]: chn = url.split("/")[-1] channels[chn] = {"output": "%s/%s" % (args["--output"], chn)} for channel in channels.values(): if not os.path.exists(channel["output"]): os.mkdir(channel["output"]) # find videos in the database. # # despite how it may seem, this is actually really fast, and fairly # memory efficient too (but really only if we're using simdjson...) videos = [ i if not simdjson else i.as_dict() for f in load_split_files(args["--database"]) for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird if "uploader_id" in i and i["uploader_id"] in channels ] while True: if len(videos) == 0: break videos_copy = videos for i in videos_copy: channel = channels[i["uploader_id"]] # precalculated for speed output = channel["output"] print("%s:" % i["id"]) basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"]) def filenotworthit(f) -> bool: try: return bool(os.path.getsize(f)) except: return False pathoutput = Path(output) # This is terrible files = list(filter(filenotworthit, [y for p in ["mkv", "mp4", "webm"] for y in pathoutput.glob(("*-%s." + p) % i["id"])])) if files: print(" video already downloaded!") videos.remove(i) write_metadata(i, basename) continue # high level "download" function. def dl(video: dict, basename: str, output: str): dls = [] if ytdlp_works: dls.append({ "func": ytdlp_dl, "name": "using yt-dlp", }) dls.append({ "func": ia_dl, "name": "from the Internet Archive", }) dls.append({ "func": desirintoplaisir_dl, "name": "from LMIJLM/DJ Plaisir's archive", }) dls.append({ "func": ghostarchive_dl, "name": "from GhostArchive" }) dls.append({ "func": wayback_dl, "name": "from the Wayback Machine" }) for dl in dls: print(" attempting to download %s" % dl["name"]) r = dl["func"](i, basename, output) if r == DownloaderStatus.SUCCESS: # all good, video's downloaded return DownloaderStatus.SUCCESS elif r == DownloaderStatus.UNAVAILABLE: # video is unavailable here print(" oops, video is not available there...") continue elif r == DownloaderStatus.ERROR: # error while downloading; likely temporary. # TODO we should save which downloader the video # was on, so we can continue back at it later. return DownloaderStatus.ERROR return DownloaderStatus.UNAVAILABLE r = dl(i, basename, output) if r == DownloaderStatus.ERROR: continue # video is downloaded, or it's totally unavailable, so # remove it from being checked again. videos.remove(i) # ... and then dump the metadata, if there isn't any on disk. write_metadata(i, basename) if r == DownloaderStatus.SUCCESS: # video is downloaded continue # video is unavailable; write out the metadata. print(" video is unavailable everywhere; dumping out metadata only") if __name__ == "__main__": main()
