diff channeldownloader.py @ 118:eac6dae753ca

*: major cleanup committer: GitHub <noreply@github.com>
author Paper <37962225+mrpapersonic@users.noreply.github.com>
date Fri, 03 Mar 2023 22:51:28 +0000
parents 80bd4a99ea00
children 196cf2e3d96e
line wrap: on
line diff
--- a/channeldownloader.py	Fri Mar 03 22:33:53 2023 +0000
+++ b/channeldownloader.py	Fri Mar 03 22:51:28 2023 +0000
@@ -25,24 +25,14 @@
 import os
 import re
 import time
-try:
-    import urllib.request as compat_urllib
-    from urllib.error import HTTPError
-except ImportError:  # Python 2
-    import urllib as compat_urllib
-    from urllib2 import HTTPError
-try:
-    import yt_dlp as youtube_dl
-    from yt_dlp.utils import sanitize_filename, DownloadError
-except ImportError:
-    try:
-        import youtube_dl
-        from youtube_dl.utils import sanitize_filename, DownloadError
-    except ImportError:
-        print("ERROR: youtube-dl/yt-dlp not installed!")
-        exit(1)
-from io import open  # for Python 2 compatibility, in Python 3 this
-                     # just maps to the built-in function
+import urllib.request
+import requests  # need this for ONE (1) exception
+import yt_dlp as youtube_dl
+from urllib.error import HTTPError
+from yt_dlp.utils import sanitize_filename, DownloadError
+from pathlib import Path
+from requests.exceptions import ConnectTimeout
+
 
 class MyLogger(object):
     def debug(self, msg):
@@ -55,15 +45,19 @@
         print(" " + msg)
         pass
 
-def ytdl_hook(d):
+
+def ytdl_hook(d) -> None:
     if d["status"] == "finished":
         print(" downloaded %s:    100%% " % (os.path.basename(d["filename"])))
     if d["status"] == "downloading":
-        print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="")
+        print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
+                                         d["_percent_str"]), end="")
     if d["status"] == "error":
-        print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"])))
+        print("\n an error occurred downloading %s!"
+              % (os.path.basename(d["filename"])))
 
-def load_split_files(path):
+
+def load_split_files(path: str) -> dict:
     if os.path.isdir(path):
         result = {"videos": []}
         for fi in os.listdir(path):
@@ -75,16 +69,30 @@
     else:
         return json.loads(open(path, "r", encoding="utf-8").read())
 
-def reporthook(count, block_size, total_size):
+
+def reporthook(count: int, block_size: int, total_size: int) -> None:
     global start_time
     if count == 0:
         start_time = time.time()
         return
-    duration = time.time() - start_time
     percent = int(count * block_size * 100 / total_size)
     print(" downloading %d%%        \r" % (percent), end="")
 
-args = docopt.docopt(__doc__)
+
+def write_metadata(i: dict, basename: str) -> None:
+    if not os.path.exists(basename + ".info.json"):
+        with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
+            try:
+                jsonfile.write(json.dumps(i).decode("utf-8"))
+            except AttributeError:
+                jsonfile.write(json.dumps(i))
+            print(" saved %s" % os.path.basename(jsonfile.name))
+    if not os.path.exists(basename + ".description"):
+        with open(basename + ".description", "w",
+                  encoding="utf-8") as descfile:
+            descfile.write(i["description"])
+            print(" saved %s" % os.path.basename(descfile.name))
+
 
 ytdl_opts = {
     "retries": 100,
@@ -97,7 +105,6 @@
     "writeannotations": True,
     "writesubtitles": True,
     "allsubtitles": True,
-    "ignoreerrors": True,
     "addmetadata": True,
     "continuedl": True,
     "embedthumbnail": True,
@@ -109,89 +116,117 @@
     "ignoreerrors": False,
 }
 
-if not os.path.exists(args["--output"]):
-    os.mkdir(args["--output"])
 
-for i in load_split_files(args["--database"])["videos"]:
-    uploader = i["uploader_id"] if "uploader_id" in i else None
-    for url in args["<url>"]:
-        channel = url.split("/")[-1]
-
-        output = "%s/%s" % (args["--output"], channel)
-        if not os.path.exists(output):
-            os.mkdir(output)
-        ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s"
-        
+def wayback_machine_dl(video: dict, basename: str) -> int:
+    try:
+        url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu",
+                       "rl.archive.org/yt/%s"])
+        headers = urllib.request.urlopen(url % video["id"])
+        contenttype = headers.getheader("Content-Type")
+        if contenttype == "video/webm":
+            ext = "webm"
+        elif contenttype == "video/mp4":
+            ext = "mp4"
+        else:
+            raise HTTPError(url=None, code=None, msg=None,
+                            hdrs=None, fp=None)
+        urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext),
+                                   reporthook)
+        print(" downloaded %s.%s" % (basename, ext))
+        return 0
+    except TimeoutError:
+        return 1
+    except HTTPError:
+        print(" video not available on the Wayback Machine!")
+        return 0
+    except Exception as e:
+        print(" unknown error downloading video!\n")
+        print(e)
+        return 0
 
-        if uploader == channel:
-            print("%s:" % i["id"])
-            # :skull:
-            # todo: put this in a function?
-            if any(x in os.listdir(output) for x in [sanitize_filename("%s-%s.mp4"  % (i["title"], i["id"]), restricted=True),
-                                                     sanitize_filename("%s-%s.mkv"  % (i["title"], i["id"]), restricted=True),
-                                                     sanitize_filename("%s-%s.webm" % (i["title"], i["id"]), restricted=True)]):
-                print(" video already downloaded!")
+def internet_archive_dl(video: dict, basename: str) -> int:
+    if internetarchive.get_item("youtube-%s" % video["id"]).exists:
+        fnames = [f.name for f in internetarchive.get_files(
+                                  "youtube-%s" % video["id"])]
+        flist = []
+        for fname in range(len(fnames)):
+            if re.search(''.join([r"((?:.+?-)?", video["id"],
+                                  r"\.(?:mp4|jpg|webp|mkv|webm|info\\.json|des"
+                                  r"cription|annotations.xml))"]),
+                                 fnames[fname]):
+                flist.append(fnames[fname])
+        while True:
+            try:
+                internetarchive.download("youtube-%s" % video["id"],
+                                         files=flist, verbose=True,
+                                         destdir=output,
+                                         no_directory=True,
+                                         ignore_existing=True,
+                                         retries=9999)
+                break
+            except ConnectTimeout:
                 continue
-            # this code is *really* ugly... todo a rewrite?
-            with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
-                try:
-                    result = ytdl.extract_info("https://youtube.com/watch?v=%s" % i["id"])
-                    continue
-                except DownloadError:
-                    print(" video is not available! attempting to find Internet Archive pages of it...")
-                except Exception as e:
-                    print(" unknown error downloading video!\n")
-                    print(e)
-            if internetarchive.get_item("youtube-%s" % i["id"]).exists:  # download from internetarchive if available
-                fnames = [f.name for f in internetarchive.get_files("youtube-%s" % i["id"])]
-                flist = []
-                for fname in range(len(fnames)):
-                    if re.search("((?:.+?-)?%s\.(?:mp4|jpg|webp|mkv|webm|info\.json|description))" % (i["id"]), fnames[fname]):
-                        flist.append(fnames[fname])
-                if len(flist) >= 1:
-                    internetarchive.download("youtube-%s" % i["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True, retries=9999)
-                else:
+            except Exception:
+                return 0
+        if flist[0][:len(video["id"])] == video["id"]:
+            for fname in flist:
+                if os.path.exists("%s/%s" % (output, fname)):
+                    os.replace("%s/%s" % (output, fname),
+                               "%s-%s" % (basename.rsplit("-", 1)[0],
+                                          fname))
+        return 1
+    return 0
+
+def main():
+    args = docopt.docopt(__doc__)
+
+    if not os.path.exists(args["--output"]):
+        os.mkdir(args["--output"])
+
+    for i in load_split_files(args["--database"])["videos"]:
+        uploader = i["uploader_id"] if "uploader_id" in i else None
+        for url in args["<url>"]:
+            channel = url.split("/")[-1]
+
+            output = "%s/%s" % (args["--output"], channel)
+            if not os.path.exists(output):
+                os.mkdir(output)
+            ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s"
+
+            if uploader == channel:
+                print("%s:" % i["id"])
+                basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
+                                         restricted=True), i["id"])
+                path = Path(output)
+                files = list(path.glob("*-%s.mkv" % i["id"]))
+                files.extend(list(path.glob("*-%s.mp4" % i["id"])))
+                files.extend(list(path.glob("*-%s.webm" % i["id"])))
+                if files:
                     print(" video already downloaded!")
+                    write_metadata(i, basename)
                     continue
-                if os.path.exists("%s/%s.info.json" % (output, i["id"])):  # will always exist no matter which setting was used to download
-                    for fname in flist:
-                        if os.path.exists(output + "/" + fname) and not os.path.exists(output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname):
-                            os.rename(output + "/" + fname, output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname)
-                else:
-                    print("ID file not found!")
-            else:
-                print(" video does not have a Internet Archive page! attempting to download from the Wayback Machine...")
-                try:  # we could use yt-dlp's extractor, but then we would need to craft a fake wayback machine url,
-                      # and we wouldn't even know if it worked. so let's continue using our little "hack"
-                    headers = compat_urllib.urlopen("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"])
-                    if hasattr(headers.info(), "getheader"):
-                        contenttype = headers.info().getheader("Content-Type")
-                    else:
-                        contenttype = headers.getheader("Content-Type")
-                    if contenttype == "video/webm":
-                        ext = "webm"
-                    elif contenttype == "video/mp4":
-                        ext = "mp4"
-                    else:
-                        raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None)
-                    compat_urllib.urlretrieve("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"], "%s/%s-%s.%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"], ext), reporthook)
-                    print(" downloaded %s-%s.%s" % (sanitize_filename(i["title"], restricted=True), i["id"], ext))
-                except HTTPError:
-                    print(" video not available on the Wayback Machine!")
-                except Exception as e:
-                    print(" unknown error downloading video!\n")
-                    print(e)
-            # metadata
-            basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"])
-            if not os.path.exists(basename + ".info.json"):
-                with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
+                # this code is *really* ugly... todo a rewrite?
+                with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
                     try:
-                        jsonfile.write(json.dumps(i).decode("utf-8"))
-                    except AttributeError:
-                        jsonfile.write(json.dumps(i))
-                    print(" saved %s" % os.path.basename(jsonfile.name))
-            if not os.path.exists(basename + ".description"):
-                with open(basename + ".description", "w", encoding="utf-8") as descfile:
-                    descfile.write(i["description"])
-                    print(" saved %s" % os.path.basename(descfile.name))
+                        ytdl.extract_info("https://youtube.com/watch?v=%s"
+                                          % i["id"])
+                        continue
+                    except DownloadError:
+                        print(" video is not available! attempting to find In"
+                              "ternet Archive pages of it...")
+                    except Exception as e:
+                        print(" unknown error downloading video!\n")
+                        print(e)
+                if internet_archive_dl(i, basename) == 0:  # if we can't download from IA
+                    print(" video does not have a Internet Archive page! attem"
+                          "pting to download from the Wayback Machine...")
+                    while True:
+                        if wayback_machine_dl(i, basename) == 0:  # success
+                            break
+                        time.sleep(5)
+                        continue
+                write_metadata(i, basename)
 
+
+if __name__ == "__main__":
+    main()