Mercurial > channeldownloader
changeset 18:05e71dd6b6ca default tip
no more ia python library
| author | Paper <paper@tflc.us> |
|---|---|
| date | Sat, 28 Feb 2026 22:31:59 -0500 |
| parents | 0d10b2ce0140 |
| children | |
| files | channeldownloader.py |
| diffstat | 1 files changed, 172 insertions(+), 104 deletions(-) [+] |
line wrap: on
line diff
--- a/channeldownloader.py Sat Feb 28 20:59:59 2026 -0500 +++ b/channeldownloader.py Sat Feb 28 22:31:59 2026 -0500 @@ -17,6 +17,18 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +# Okay, this is a bit of a clusterfuck. +# +# This originated as a script that simply helped me scrape a bunch +# of videos off some deleted channels (in fact, that's still it's main +# purpose) and was very lackluster (hardcoded shite everywhere). +# Fortunately in recent times I've cleaned up the code and added some +# other mirrors, as well as improved the archive.org scraper to not +# shoot itself when it encounters an upload that's not from tubeup. +# +# Nevertheless, I still consider much of this file to be dirty hacks, +# especially some of the HTTP stuff. + """ Usage: channeldownloader.py <url>... (--database <file>) @@ -42,11 +54,13 @@ import re import time import urllib.request +import urllib.parse import os import ssl import io import shutil import xml.etree.ElementTree as XmlET +import enum from urllib.error import HTTPError from pathlib import Path @@ -79,16 +93,6 @@ print("failed to import yt-dlp!") print("downloading from YouTube directly will not work.") -ia_works = False - -try: - import internetarchive - from requests.exceptions import ConnectTimeout - ia_works = True -except ImportError: - print("failed to import the Internet Archive's python library!") - print("downloading from IA will not work.") - zipfile_works = False try: @@ -98,6 +102,7 @@ print("failed to import zipfile!") print("loading the database from a .zip file will not work.") + ############################################################################## ## DOWNLOADERS @@ -108,11 +113,91 @@ # 'basename': the basename output to write as. # 'output': the output directory. # yes, it's weird, but I don't care ;) -# -# Magic return values: -# 0 -- all good, video is downloaded -# 1 -- error downloading video; it may still be available if we try again -# 2 -- video is proved totally unavailable here. give up + +class DownloaderStatus(enum.Enum): + # Download finished successfully. + SUCCESS = 0 + # Download failed. + # Note that this should NOT be used for when the video is unavailable + # (i.e. error 404); it should only be used when the video cannot be + # downloaded *at this time*, indicating a server problem. This is very + # common for the Internet Archive, not sure about others. + ERROR = 1 + # Video is unavailable from this provider. + UNAVAILABLE = 2 + +""" +Downloads a file from `url` to `path`, and prints the progress to the +screen. +""" +def download_file(url: str, path: str, guessext: bool = False, length: int = None) -> DownloaderStatus: + # Download in 32KiB chunks + CHUNK_SIZE = 32768 + + # Don't exceed 79 chars. + try: + with urllib.request.urlopen(url) as http: + if length is None: + # Check whether the URL gives us Content-Length. + # If so, call f.truncate to tell the filesystem how much + # we will be downloading before we start writing. + # + # This is also useful for displaying how much we've + # downloaded overall as a percent. + length = http.getheader("Content-Length", default=None) + try: + if length is not None: + length = int(length) + f.truncate(length) + except: + # fuck it + length = None + + if guessext: + # Guess file extension from MIME type + mime = http.getheader("Content-Type", default=None) + if not mime: + return DownloaderStatus.ERROR + + if mime == "video/mp4": + path += ".mp4" + elif mime == "video/webm": + path += ".webm" + else: + return DownloaderStatus.ERROR + + par = os.path.dirname(path) + if not os.path.isdir(par): + os.makedirs(par) + + with open(path, "wb") as f: + # Download the entire file + while True: + data = http.read(CHUNK_SIZE) + if not data: + break + + f.write(data) + print("\r downloading to %s, " % path, end="") + if length: + print("%.2f%%" % (f.tell() / length * 100.0), end="") + else: + print("%.2f MiB" % (f.tell() / (1 << 20)), end="") + + print("\r downloaded to %s " % path) + + if length is not None and length != f.tell(): + # Server lied about what the length was? + print(" INFO: HTTP server's Content-Length header lied??") + except TimeoutError: + return DownloaderStatus.ERROR + except HTTPError: + return DownloaderStatus.UNAVAILABLE + except Exception as e: + print(" unknown error downloading video;", e); + return DownloaderStatus.ERROR + + return DownloaderStatus.SUCCESS # Basic downloader template. @@ -124,46 +209,33 @@ # extension. For example: # https://cdn.ytarchiver.com/%s.%s def basic_dl_template(video: dict, basename: str, output: str, - linktemplate: str, vexts: list, iexts: list) -> int: + linktemplate: str, vexts: list, iexts: list) -> DownloaderStatus: # actual downloader def basic_dl_impl(vid: str, ext: str) -> int: url = (linktemplate % (vid, ext)) - try: - with urllib.request.urlopen(url) as headers: - with open("%s.%s" % (basename, ext), "wb") as f: - f.write(headers.read()) - print(" downloaded %s.%s" % (basename, ext)) - return 0 - except TimeoutError: - return 1 - except HTTPError: - return 2 - except Exception as e: - print(" unknown error downloading video!") - print(e) - return 1 + return download_file(url, "%s.%s" % (basename, ext)) for exts in [vexts, iexts]: for ext in exts: r = basic_dl_impl(video["id"], ext) - if r == 0: + if r == DownloaderStatus.SUCCESS: break # done! - elif r == 1: + elif r == DownloaderStatus.ERROR: # timeout; try again later? - return 1 - elif r == 2: + return DownloaderStatus.ERROR + elif r == DownloaderStatus.UNAVAILABLE: continue else: # we did not break out of the loop # which means all extensions were unavailable - return 2 + return DownloaderStatus.UNAVAILABLE # video was downloaded successfully - return 0 + return DownloaderStatus.SUCCESS # GhostArchive, basic... -def ghostarchive_dl(video: dict, basename: str, output: str) -> int: +def ghostarchive_dl(video: dict, basename: str, output: str) -> DownloaderStatus: return basic_dl_template(video, basename, output, "https://ghostvideo.b-cdn.net/chimurai/%s.%s", ["mp4", "webm", "mkv"], @@ -178,7 +250,7 @@ # # there isn't really a proper API; I've based the scraping off of the HTML # and the public source code. -def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int: +def desirintoplaisir_dl(video: dict, basename: str, output: str) -> DownloaderStatus: return basic_dl_template(video, basename, output, "https://media.desirintoplaisir.net/content/%s.%s", ["mp4", "webm", "mkv"], @@ -194,30 +266,9 @@ # TODO: Download thumbnails through the CDX API: # https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py # the CDX API is pretty slow though, so it should be used as a last resort. -def wayback_dl(video: dict, basename: str, output: str) -> int: - try: - url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv" - "e.org/yt/%s" % video["id"]) - with urllib.request.urlopen(url) as headers: - contenttype = headers.getheader("Content-Type") - if contenttype == "video/webm" or contenttype == "video/mp4": - ext = contenttype.split("/")[-1] - else: - raise HTTPError(url=None, code=None, msg=None, - hdrs=None, fp=None) - with open("%s.%s" % (basename, ext), "wb") as f: - f.write(headers.read()) - print(" downloaded %s.%s" % (basename, ext)) - return 0 - except TimeoutError: - return 1 - except HTTPError: - # dont keep trying - return 2 - except Exception as e: - print(" unknown error downloading video!") - print(e) - return 1 +def wayback_dl(video: dict, basename: str, output: str) -> DownloaderStatus: + PREFIX = "https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/" + return download_file(PREFIX + video["id"], basename, True) # Also captures the ID for comparison @@ -225,7 +276,12 @@ # Internet Archive (tubeup) -def ia_dl(video: dict, basename: str, output: str) -> int: +# +# NOTE: We don't actually need the python library anymore; we already +# explicitly download the file listing using our own logic, so there's +# really nothing stopping us from going ahead and downloading everything +# else using the download_file function. +def ia_dl(video: dict, basename: str, output: str) -> DownloaderStatus: def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool: # FIXME: # @@ -296,37 +352,51 @@ return None try: + r = [] + # Now parse the XML and make a list of each original file - return [x.attrib["name"] for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d))] + for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d)): + l = {"name": x.attrib["name"]} + + sz = x.find("size") + if sz is not None: + l["size"] = int(sz.text) + + r.append(l) + + return r + except Exception as e: print(e) return None - originalfiles = ia_get_original_files("youtube-%s" % video["id"]) + IA_IDENTIFIER = "youtube-%s" % video["id"] + + originalfiles = ia_get_original_files(IA_IDENTIFIER) if not originalfiles: - return 2 + return DownloaderStatus.UNAVAILABLE flist = [ f for f in originalfiles - if ia_file_legit(f, video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"]) + if ia_file_legit(f["name"], video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"]) ] if not flist: - return 2 # ?????? + return DownloaderStatus.UNAVAILABLE # ?????? - while True: - try: - internetarchive.download("youtube-%s" % video["id"], files=flist, - verbose=True, ignore_existing=True, - retries=9999) - break - except ConnectTimeout: - time.sleep(1) - continue - except Exception as e: - print(e) - return 1 + for i in flist: + for _ in range(1, 10): + path = "%s/%s" % (IA_IDENTIFIER, i["name"]) + r = download_file("https://archive.org/download/" + urllib.parse.quote(path, encoding="utf-8"), path, False, None if not "size" in i else i["size"]) + if r == DownloaderStatus.SUCCESS: + break + elif r == DownloaderStatus.ERROR: + # sleep for a bit and retry + time.sleep(1.0) + continue + elif r == DownloaderStatus.UNAVAILABLE: + return DownloaderStatus.UNAVAILABLE # Newer versions of tubeup save only the video ID. # Account for this by replacing it. @@ -336,7 +406,7 @@ # # paper/2026-02-27: an update in the IA python library changed # the way destdir works, so it just gets entirely ignored. - for fname in flist: + for f in flist: def getext(s: str, vidid: str) -> typing.Optional[str]: # special cases for i in [".info.json", ".annotations.xml"]: @@ -354,12 +424,12 @@ return spli[1] - ondisk = "youtube-%s/%s" % (video["id"], fname) + ondisk = "youtube-%s/%s" % (video["id"], f["name"]) if not os.path.exists(ondisk): continue - ext = getext(fname, video["id"]) + ext = getext(f["name"], video["id"]) if ext is None: continue @@ -367,10 +437,10 @@ shutil.rmtree("youtube-%s" % video["id"]) - return 0 + return DownloaderStatus.SUCCESS -def ytdlp_dl(video: dict, basename: str, output: str) -> int: +def ytdlp_dl(video: dict, basename: str, output: str) -> DownloaderStatus: # intentionally ignores all messages besides errors class MyLogger(object): def debug(self, msg): @@ -414,22 +484,21 @@ "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, - - #mm, output template + # yummy "outtmpl": output + "/%(title)s-%(id)s.%(ext)s", } with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"]) - return 0 + return DownloaderStatus.SUCCESS except DownloadError: - return 2 + return DownloaderStatus.UNAVAILABLE except Exception as e: print(" unknown error downloading video!\n") print(e) - return 1 + return DownloaderStatus.ERROR # TODO: There are multiple other youtube archival websites available. @@ -567,11 +636,10 @@ "name": "using yt-dlp", }) - if ia_works: - dls.append({ - "func": ia_dl, - "name": "from the Internet Archive", - }) + dls.append({ + "func": ia_dl, + "name": "from the Internet Archive", + }) dls.append({ "func": desirintoplaisir_dl, @@ -589,23 +657,23 @@ for dl in dls: print(" attempting to download %s" % dl["name"]) r = dl["func"](i, basename, output) - if r == 0: + if r == DownloaderStatus.SUCCESS: # all good, video's downloaded - return 0 - elif r == 2: + return DownloaderStatus.SUCCESS + elif r == DownloaderStatus.UNAVAILABLE: # video is unavailable here print(" oops, video is not available there...") continue - elif r == 1: + elif r == DownloaderStatus.ERROR: # error while downloading; likely temporary. # TODO we should save which downloader the video # was on, so we can continue back at it later. - return 1 - # video is unavailable everywhere - return 2 + return DownloaderStatus.ERROR + + return DownloaderStatus.UNAVAILABLE r = dl(i, basename, output) - if r == 1: + if r == DownloaderStatus.ERROR: continue # video is downloaded, or it's totally unavailable, so @@ -614,7 +682,7 @@ # ... and then dump the metadata, if there isn't any on disk. write_metadata(i, basename) - if r == 0: + if r == DownloaderStatus.SUCCESS: # video is downloaded continue
