Mercurial > channeldownloader
view channeldownloader.py @ 14:03c8fd4069fb default tip
*: big refactor, switch to GPLv2, and add README
Okay: now, we use a modular approach for downloaders. Each downloader
is provided through a single function (which does the fetching).
Additionally, the internetarchive library is optional now if the user
does not want to install it.
yt-dlp is still necessary though for it's sanitize_filename function.
If and when I get to adding vanity features (such as finding the best
possible source by comparing resolution and bitrate), I'll probably
separate out all of the downloaders into different files.
I also moved this project to a separate repository from 'codedump',
keeping all of the relevant commit history :)
| author | Paper <paper@tflc.us> |
|---|---|
| date | Sat, 30 Aug 2025 17:09:56 -0400 |
| parents | 2e7a3725ad21 |
| children |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # channeldownloader.py - scrapes youtube videos from a channel from # a variety of sources # Copyright (c) 2021-2025 Paper <paper@tflc.us> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ Usage: channeldownloader.py <url>... (--database <file>) [--output <folder>] channeldownloader.py -h | --help Arguments: <url> YouTube channel URL to download from Options: -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] -d --database <file> yt-dlp style database of videos. Should contain an array of yt-dlp .info.json data. For example, FinnOtaku's YTPMV metadata archive. """ # Built-in python stuff (no possible missing dependencies) from __future__ import print_function import docopt import os import re import time import urllib.request import os import ssl from urllib.error import HTTPError from pathlib import Path # We can utilize special simdjson features if it is available simdjson = False try: import simdjson as json simdjson = True print("INFO: using simdjson") except ImportError: try: import ujson as json print("INFO: using ujson") except ImportError: try: import orjson as json print("INFO: using orjson") except ImportError: import json print("INFO: using built-in json (slow!)") ytdlp_works = False try: import yt_dlp as youtube_dl from yt_dlp.utils import sanitize_filename, DownloadError ytdlp_works = True except ImportError: print("failed to import yt-dlp!") print("downloading from YouTube directly will not work.") ia_works = False try: import internetarchive from requests.exceptions import ConnectTimeout ia_works = True except ImportError: print("failed to import the Internet Archive's python library!") print("downloading from IA will not work.") ############################################################################## ## DOWNLOADERS # All downloaders should be a function under this signature: # dl(video: dict, basename: str, output: str) -> int # where: # 'video': the .info.json scraped from the YTPMV metadata archive. # 'basename': the basename output to write as. # 'output': the output directory. # yes, it's weird, but I don't care ;) # # Magic return values: # 0 -- all good, video is downloaded # 1 -- error downloading video; it may still be available if we try again # 2 -- video is proved totally unavailable here. give up # Basic downloader template. # # This does a brute-force of all extensions within vexts and iexts # in an attempt to find a working video link. # # linktemplate is a template to be created using the video ID and # extension. For example: # https://cdn.ytarchiver.com/%s.%s def basic_dl_template(video: dict, basename: str, output: str, linktemplate: str, vexts: list, iexts: list) -> int: # actual downloader def basic_dl_impl(vid: str, ext: str) -> int: url = (linktemplate % (vid, ext)) try: with urllib.request.urlopen(url) as headers: with open("%s.%s" % (basename, ext), "wb") as f: f.write(headers.read()) print(" downloaded %s.%s" % (basename, ext)) return 0 except TimeoutError: return 1 except HTTPError: return 2 except Exception as e: print(" unknown error downloading video!") print(e) return 1 for exts in [vexts, iexts]: for ext in exts: r = basic_dl_impl(video["id"], ext) if r == 0: break # done! elif r == 1: # timeout; try again later? return 1 elif r == 2: continue else: # we did not break out of the loop # which means all extensions were unavailable return 2 # video was downloaded successfully return 0 # GhostArchive, basic... def ghostarchive_dl(video: dict, basename: str, output: str) -> int: return basic_dl_template(video, basename, output, "https://ghostvideo.b-cdn.net/chimurai/%s.%s", ["mp4", "webm", "mkv"], [] # none ) # media.desirintoplaisir.net # # holds PRIMARILY popular videos (i.e. no niche internet microcelebrities) # or weeb shit, however it seems to be growing to other stuff. # # there isn't really a proper API; I've based the scraping off of the HTML # and the public source code. def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int: return basic_dl_template(video, basename, output, "https://media.desirintoplaisir.net/content/%s.%s", ["mp4", "webm", "mkv"], ["webp"] ) # Internet Archive's Wayback Machine # # Internally, IA's javascript routines forward to the magic # URL used here. # # TODO: Download thumbnails through the CDX API: # https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py # the CDX API is pretty slow though, so it should be used as a last resort. def wayback_dl(video: dict, basename: str, output: str) -> int: try: url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv" "e.org/yt/%s" % video["id"]) with urllib.request.urlopen(url) as headers: contenttype = headers.getheader("Content-Type") if contenttype == "video/webm" or contenttype == "video/mp4": ext = contenttype.split("/")[-1] else: raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None) with open("%s.%s" % (basename, ext), "wb") as f: f.write(headers.read()) print(" downloaded %s.%s" % (basename, ext)) return 0 except TimeoutError: return 1 except HTTPError: # dont keep trying return 2 except Exception as e: print(" unknown error downloading video!") print(e) return 1 # Internet Archive (tubeup) def ia_dl(video: dict, basename: str, output: str) -> int: def ia_file_legit(file: internetarchive.File, vidid: str) -> bool: # FIXME: # # There are some items on IA that combine the old tubeup behavior # (i.e., including the sanitized video name before the ID) # and the new tubeup behavior (filename only contains the video ID) # hence we will download the entire video twice. # # This isn't much of a problem anymore (and hasn't been for like 3 # years), since I contributed code to not upload something if there # is already something there. However we should handle this case # anyway. # # Additionally, there are some items that have duplicate video files # (from when the owners changed the title). We should ideally only # download unique files. IA seems to provide SHA1 hashes... # # We should also check if whether the copy on IA is higher quality # than a local copy... :) if not re.search(r"((?:.+?-)?" + vidid + r"\.(?:mp4|jpg|webp|mkv|w" r"ebm|info\.json|description|annotations.xml))", f.name): return False # now, check the metadata print(f) return True if not internetarchive.get_item("youtube-%s" % video["id"]).exists: return 2 flist = [ f.name for f in internetarchive.get_files("youtube-%s" % video["id"]) if ia_file_legit(f.name, video["id"]) ] while True: try: internetarchive.download("youtube-%s" % video["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True, retries=9999) break except ConnectTimeout: time.sleep(1) continue except Exception as e: print(e) return 1 # Newer versions of tubeup save only the video ID. # Account for this by replacing it. # # paper/2025-08-30: fixed a bug where video IDs with hyphens # would incorrectly truncate for fname in flist: # ignore any files whose names are not simply the ID if os.path.splitext(fname)[0] != video["id"]: continue if os.path.exists("%s/%s" % (output, fname)): os.replace("%s/%s" % (output, fname), "%s.%s" % (basename, os.path.splitext(fname))[1]) return 0 def ytdlp_dl(video: dict, basename: str, output: str) -> int: # intentionally ignores all messages besides errors class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(" " + msg) pass def ytdl_hook(d) -> None: if d["status"] == "finished": print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"]))) ytdl_opts = { "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, #mm, output template "outtmpl": output + "/%(title)s-%(id)s.%(ext)s", } with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"]) return 0 except DownloadError: return 2 except Exception as e: print(" unknown error downloading video!\n") print(e) return 1 # TODO: There are multiple other youtube archival websites available. # Most notable is https://findyoutubevideo.thetechrobo.ca . # This combines a lot of sparse youtube archival services, and has # a convenient API we can use. Nice! # # There is also the "Distributed YouTube Archive" which is totally # useless because there's way to automate it... ############################################################################## def main(): # generator; creates a list of files, and returns the parsed form of # each. note that the parser is not necessarily def load_split_files(path: str): list_files = [] # build the path list if not os.path.isdir(path): list_files.append(path) else: for fi in os.listdir(path): if re.search(r"vids[0-9\-]+?\.json", fi): list_files.append(path + "/" + fi) # now open each as a json for fi in list_files: print(fi) with open(fi, "r", encoding="utf-8") as infile: if simdjson: # Using this is a lot faster in SIMDJSON, since instead # of converting all of the JSON key/value pairs into # native Python objects, they stay in an internal state. # # This means we only get the stuff we absolutely need, # which is the uploader ID, and copy everything else # if the ID is one we are looking for. parser = json.Parser() yield parser.parse(infile.read()) del parser else: yield json.load(infile) def write_metadata(i: dict, basename: str) -> None: # ehhh if not os.path.exists(basename + ".info.json"): with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: try: # orjson outputs bytes jsonfile.write(json.dumps(i).decode("utf-8")) except AttributeError: # everything else outputs a string jsonfile.write(json.dumps(i)) print(" saved %s" % os.path.basename(jsonfile.name)) if not os.path.exists(basename + ".description"): with open(basename + ".description", "w", encoding="utf-8") as descfile: descfile.write(i["description"]) print(" saved %s" % os.path.basename(descfile.name)) args = docopt.docopt(__doc__) if not os.path.exists(args["--output"]): os.mkdir(args["--output"]) channels = dict() for url in args["<url>"]: chn = url.split("/")[-1] channels[chn] = {"output": "%s/%s" % (args["--output"], chn)} for channel in channels.values(): if not os.path.exists(channel["output"]): os.mkdir(channel["output"]) # find videos in the database. # # despite how it may seem, this is actually really fast, and fairly # memory efficient too (but really only if we're using simdjson...) videos = [ i if not simdjson else i.as_dict() for f in load_split_files(args["--database"]) for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird if "uploader_id" in i and i["uploader_id"] in channels ] while True: if len(videos) == 0: break videos_copy = videos for i in videos_copy: channel = channels[i["uploader_id"]] # precalculated for speed output = channel["output"] print("%s:" % i["id"]) basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"]) files = [y for p in ["mkv", "mp4", "webm"] for y in Path(output).glob(("*-%s." + p) % i["id"])] if files: print(" video already downloaded!") videos.remove(i) write_metadata(i, basename) continue # high level "download" function. def dl(video: dict, basename: str, output: str): dls = [] if ytdlp_works: dls.append({ "func": ytdlp_dl, "name": "using yt-dlp", }) if ia_works: dls.append({ "func": ia_dl, "name": "from the Internet Archive", }) dls.append({ "func": desirintoplaisir_dl, "name": "from LMIJLM/DJ Plaisir's archive", }) dls.append({ "func": ghostarchive_dl, "name": "from GhostArchive" }) dls.append({ "func": wayback_dl, "name": "from the Wayback Machine" }) for dl in dls: print(" attempting to download %s" % dl["name"]) r = dl["func"](i, basename, output) if r == 0: # all good, video's downloaded return 0 elif r == 2: # video is unavailable here print(" oops, video is not available there...") continue elif r == 1: # error while downloading; likely temporary. # TODO we should save which downloader the video # was on, so we can continue back at it later. return 1 # video is unavailable everywhere return 2 r = dl(i, basename, output) if r == 1: continue # video is downloaded, or it's totally unavailable, so # remove it from being checked again. videos.remove(i) # ... and then dump the metadata, if there isn't any on disk. write_metadata(i, basename) if r == 0: # video is downloaded continue # video is unavailable; write out the metadata. print(" video is unavailable everywhere; dumping out metadata only") if __name__ == "__main__": main()
