changeset 18:05e71dd6b6ca default tip

no more ia python library
author Paper <paper@tflc.us>
date Sat, 28 Feb 2026 22:31:59 -0500
parents 0d10b2ce0140
children
files channeldownloader.py
diffstat 1 files changed, 172 insertions(+), 104 deletions(-) [+]
line wrap: on
line diff
--- a/channeldownloader.py	Sat Feb 28 20:59:59 2026 -0500
+++ b/channeldownloader.py	Sat Feb 28 22:31:59 2026 -0500
@@ -17,6 +17,18 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+# Okay, this is a bit of a clusterfuck.
+#
+# This originated as a script that simply helped me scrape a bunch
+# of videos off some deleted channels (in fact, that's still it's main
+# purpose) and was very lackluster (hardcoded shite everywhere).
+# Fortunately in recent times I've cleaned up the code and added some
+# other mirrors, as well as improved the archive.org scraper to not
+# shoot itself when it encounters an upload that's not from tubeup.
+#
+# Nevertheless, I still consider much of this file to be dirty hacks,
+# especially some of the HTTP stuff.
+
 """
 Usage:
   channeldownloader.py <url>... (--database <file>)
@@ -42,11 +54,13 @@
 import re
 import time
 import urllib.request
+import urllib.parse
 import os
 import ssl
 import io
 import shutil
 import xml.etree.ElementTree as XmlET
+import enum
 from urllib.error import HTTPError
 from pathlib import Path
 
@@ -79,16 +93,6 @@
     print("failed to import yt-dlp!")
     print("downloading from YouTube directly will not work.")
 
-ia_works = False
-
-try:
-    import internetarchive
-    from requests.exceptions import ConnectTimeout
-    ia_works = True
-except ImportError:
-    print("failed to import the Internet Archive's python library!")
-    print("downloading from IA will not work.")
-
 zipfile_works = False
 
 try:
@@ -98,6 +102,7 @@
     print("failed to import zipfile!")
     print("loading the database from a .zip file will not work.")
 
+
 ##############################################################################
 ## DOWNLOADERS
 
@@ -108,11 +113,91 @@
 #    'basename': the basename output to write as.
 #    'output': the output directory.
 # yes, it's weird, but I don't care ;)
-#
-# Magic return values:
-#  0 -- all good, video is downloaded
-#  1 -- error downloading video; it may still be available if we try again
-#  2 -- video is proved totally unavailable here. give up
+
+class DownloaderStatus(enum.Enum):
+    # Download finished successfully.
+    SUCCESS = 0
+    # Download failed.
+    # Note that this should NOT be used for when the video is unavailable
+    # (i.e. error 404); it should only be used when the video cannot be
+    # downloaded *at this time*, indicating a server problem. This is very
+    # common for the Internet Archive, not sure about others.
+    ERROR = 1
+    # Video is unavailable from this provider.
+    UNAVAILABLE = 2
+
+"""
+Downloads a file from `url` to `path`, and prints the progress to the
+screen.
+"""
+def download_file(url: str, path: str, guessext: bool = False, length: int = None) -> DownloaderStatus:
+    # Download in 32KiB chunks
+    CHUNK_SIZE = 32768
+
+    # Don't exceed 79 chars.
+    try:
+        with urllib.request.urlopen(url) as http:
+            if length is None:
+                # Check whether the URL gives us Content-Length.
+                # If so, call f.truncate to tell the filesystem how much
+                # we will be downloading before we start writing.
+                #
+                # This is also useful for displaying how much we've
+                # downloaded overall as a percent.
+                length = http.getheader("Content-Length", default=None)
+                try:
+                    if length is not None:
+                        length = int(length)
+                        f.truncate(length)
+                except:
+                    # fuck it
+                    length = None
+
+            if guessext:
+                # Guess file extension from MIME type
+                mime = http.getheader("Content-Type", default=None)
+                if not mime:
+                    return DownloaderStatus.ERROR
+
+                if mime == "video/mp4":
+                    path += ".mp4"
+                elif mime == "video/webm":
+                    path += ".webm"
+                else:
+                    return DownloaderStatus.ERROR
+
+            par = os.path.dirname(path)
+            if not os.path.isdir(par):
+                os.makedirs(par)
+
+            with open(path, "wb") as f:
+                # Download the entire file
+                while True:
+                    data = http.read(CHUNK_SIZE)
+                    if not data:
+                        break
+
+                    f.write(data)
+                    print("\r downloading to %s, " % path, end="")
+                    if length:
+                        print("%.2f%%" % (f.tell() / length * 100.0), end="")
+                    else:
+                        print("%.2f MiB" % (f.tell() / (1 << 20)), end="")
+
+                print("\r downloaded to %s        " % path)
+
+                if length is not None and length != f.tell():
+                    # Server lied about what the length was?
+                    print(" INFO: HTTP server's Content-Length header lied??")
+    except TimeoutError:
+        return DownloaderStatus.ERROR
+    except HTTPError:
+        return DownloaderStatus.UNAVAILABLE
+    except Exception as e:
+        print(" unknown error downloading video;", e);
+        return DownloaderStatus.ERROR
+
+    return DownloaderStatus.SUCCESS
 
 
 # Basic downloader template.
@@ -124,46 +209,33 @@
 # extension. For example:
 #    https://cdn.ytarchiver.com/%s.%s
 def basic_dl_template(video: dict, basename: str, output: str,
-        linktemplate: str, vexts: list, iexts: list) -> int:
+        linktemplate: str, vexts: list, iexts: list) -> DownloaderStatus:
     # actual downloader
     def basic_dl_impl(vid: str, ext: str) -> int:
         url = (linktemplate % (vid, ext))
-        try:
-            with urllib.request.urlopen(url) as headers:
-                with open("%s.%s" % (basename, ext), "wb") as f:
-                    f.write(headers.read())
-            print(" downloaded %s.%s" % (basename, ext))
-            return 0
-        except TimeoutError:
-            return 1
-        except HTTPError:
-            return 2
-        except Exception as e:
-            print(" unknown error downloading video!")
-            print(e)
-            return 1
+        return download_file(url, "%s.%s" % (basename, ext))
 
     for exts in [vexts, iexts]:
         for ext in exts:
             r = basic_dl_impl(video["id"], ext)
-            if r == 0:
+            if r == DownloaderStatus.SUCCESS:
                 break  # done!
-            elif r == 1:
+            elif r == DownloaderStatus.ERROR:
                 # timeout; try again later?
-                return 1
-            elif r == 2:
+                return DownloaderStatus.ERROR
+            elif r == DownloaderStatus.UNAVAILABLE:
                 continue
         else:
             # we did not break out of the loop
             # which means all extensions were unavailable
-            return 2
+            return DownloaderStatus.UNAVAILABLE
 
     # video was downloaded successfully
-    return 0
+    return DownloaderStatus.SUCCESS
 
 
 # GhostArchive, basic...
-def ghostarchive_dl(video: dict, basename: str, output: str) -> int:
+def ghostarchive_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
     return basic_dl_template(video, basename, output,
         "https://ghostvideo.b-cdn.net/chimurai/%s.%s",
         ["mp4", "webm", "mkv"],
@@ -178,7 +250,7 @@
 #
 # there isn't really a proper API; I've based the scraping off of the HTML
 # and the public source code.
-def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int:
+def desirintoplaisir_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
     return basic_dl_template(video, basename, output,
         "https://media.desirintoplaisir.net/content/%s.%s",
         ["mp4", "webm", "mkv"],
@@ -194,30 +266,9 @@
 # TODO: Download thumbnails through the CDX API:
 # https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py
 # the CDX API is pretty slow though, so it should be used as a last resort.
-def wayback_dl(video: dict, basename: str, output: str) -> int:
-    try:
-        url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv"
-               "e.org/yt/%s" % video["id"])
-        with urllib.request.urlopen(url) as headers:
-            contenttype = headers.getheader("Content-Type")
-            if contenttype == "video/webm" or contenttype == "video/mp4":
-                ext = contenttype.split("/")[-1]
-            else:
-                raise HTTPError(url=None, code=None, msg=None,
-                                hdrs=None, fp=None)
-            with open("%s.%s" % (basename, ext), "wb") as f:
-                f.write(headers.read())
-        print(" downloaded %s.%s" % (basename, ext))
-        return 0
-    except TimeoutError:
-        return 1
-    except HTTPError:
-        # dont keep trying
-        return 2
-    except Exception as e:
-        print(" unknown error downloading video!")
-        print(e)
-        return 1
+def wayback_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
+    PREFIX = "https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/"
+    return download_file(PREFIX + video["id"], basename, True)
 
 
 # Also captures the ID for comparison
@@ -225,7 +276,12 @@
 
 
 # Internet Archive (tubeup)
-def ia_dl(video: dict, basename: str, output: str) -> int:
+#
+# NOTE: We don't actually need the python library anymore; we already
+# explicitly download the file listing using our own logic, so there's
+# really nothing stopping us from going ahead and downloading everything
+# else using the download_file function.
+def ia_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
     def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool:
         # FIXME:
         #
@@ -296,37 +352,51 @@
             return None
 
         try:
+            r = []
+
             # Now parse the XML and make a list of each original file
-            return [x.attrib["name"] for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d))]
+            for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d)):
+                l = {"name": x.attrib["name"]}
+
+                sz = x.find("size")
+                if sz is not None:
+                    l["size"] = int(sz.text)
+
+                r.append(l)
+
+            return r
+
         except Exception as e:
             print(e)
             return None
 
-    originalfiles = ia_get_original_files("youtube-%s" % video["id"])
+    IA_IDENTIFIER = "youtube-%s" % video["id"]
+
+    originalfiles = ia_get_original_files(IA_IDENTIFIER)
     if not originalfiles:
-        return 2
+        return DownloaderStatus.UNAVAILABLE
 
     flist = [
         f
         for f in originalfiles
-        if ia_file_legit(f, video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"])
+        if ia_file_legit(f["name"], video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"])
     ]
 
     if not flist:
-        return 2 # ??????
+        return DownloaderStatus.UNAVAILABLE # ??????
 
-    while True:
-        try:
-            internetarchive.download("youtube-%s" % video["id"], files=flist,
-                                     verbose=True, ignore_existing=True,
-                                     retries=9999)
-            break
-        except ConnectTimeout:
-            time.sleep(1)
-            continue
-        except Exception as e:
-            print(e)
-            return 1
+    for i in flist:
+        for _ in range(1, 10):
+            path = "%s/%s" % (IA_IDENTIFIER, i["name"])
+            r = download_file("https://archive.org/download/" + urllib.parse.quote(path, encoding="utf-8"), path, False, None if not "size" in i else i["size"])
+            if r == DownloaderStatus.SUCCESS:
+                break
+            elif r == DownloaderStatus.ERROR:
+                # sleep for a bit and retry
+                time.sleep(1.0)
+                continue
+            elif r == DownloaderStatus.UNAVAILABLE:
+                return DownloaderStatus.UNAVAILABLE
 
     # Newer versions of tubeup save only the video ID.
     # Account for this by replacing it.
@@ -336,7 +406,7 @@
     #
     # paper/2026-02-27: an update in the IA python library changed
     # the way destdir works, so it just gets entirely ignored.
-    for fname in flist:
+    for f in flist:
         def getext(s: str, vidid: str) -> typing.Optional[str]:
             # special cases
             for i in [".info.json", ".annotations.xml"]:
@@ -354,12 +424,12 @@
 
             return spli[1]
 
-        ondisk = "youtube-%s/%s" % (video["id"], fname)
+        ondisk = "youtube-%s/%s" % (video["id"], f["name"])
 
         if not os.path.exists(ondisk):
             continue
 
-        ext = getext(fname, video["id"])
+        ext = getext(f["name"], video["id"])
         if ext is None:
             continue
 
@@ -367,10 +437,10 @@
 
     shutil.rmtree("youtube-%s" % video["id"])
 
-    return 0
+    return DownloaderStatus.SUCCESS
 
 
-def ytdlp_dl(video: dict, basename: str, output: str) -> int:
+def ytdlp_dl(video: dict, basename: str, output: str) -> DownloaderStatus:
     # intentionally ignores all messages besides errors
     class MyLogger(object):
         def debug(self, msg):
@@ -414,22 +484,21 @@
         "progress_hooks": [ytdl_hook],
         "logger": MyLogger(),
         "ignoreerrors": False,
-
-        #mm, output template
+        # yummy
         "outtmpl": output + "/%(title)s-%(id)s.%(ext)s",
     }
 
     with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
         try:
             ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"])
-            return 0
+            return DownloaderStatus.SUCCESS
         except DownloadError:
-            return 2
+            return DownloaderStatus.UNAVAILABLE
         except Exception as e:
             print(" unknown error downloading video!\n")
             print(e)
 
-    return 1
+    return DownloaderStatus.ERROR
 
 
 # TODO: There are multiple other youtube archival websites available.
@@ -567,11 +636,10 @@
                         "name": "using yt-dlp",
                     })
 
-                if ia_works:
-                    dls.append({
-                        "func": ia_dl,
-                        "name": "from the Internet Archive",
-                    })
+                dls.append({
+                    "func": ia_dl,
+                    "name": "from the Internet Archive",
+                })
 
                 dls.append({
                     "func": desirintoplaisir_dl,
@@ -589,23 +657,23 @@
                 for dl in dls:
                     print(" attempting to download %s" % dl["name"])
                     r = dl["func"](i, basename, output)
-                    if r == 0:
+                    if r == DownloaderStatus.SUCCESS:
                         # all good, video's downloaded
-                        return 0
-                    elif r == 2:
+                        return DownloaderStatus.SUCCESS
+                    elif r == DownloaderStatus.UNAVAILABLE:
                         # video is unavailable here
                         print(" oops, video is not available there...")
                         continue
-                    elif r == 1:
+                    elif r == DownloaderStatus.ERROR:
                         # error while downloading; likely temporary.
                         # TODO we should save which downloader the video
                         # was on, so we can continue back at it later.
-                        return 1
-                # video is unavailable everywhere
-                return 2
+                        return DownloaderStatus.ERROR
+
+                return DownloaderStatus.UNAVAILABLE
 
             r = dl(i, basename, output)
-            if r == 1:
+            if r == DownloaderStatus.ERROR:
                 continue
 
             # video is downloaded, or it's totally unavailable, so
@@ -614,7 +682,7 @@
             # ... and then dump the metadata, if there isn't any on disk.
             write_metadata(i, basename)
 
-            if r == 0:
+            if r == DownloaderStatus.SUCCESS:
                 # video is downloaded
                 continue