Mercurial > codedump
diff channeldownloader.py @ 118:eac6dae753ca
*: major cleanup
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Fri, 03 Mar 2023 22:51:28 +0000 |
parents | 80bd4a99ea00 |
children | 196cf2e3d96e |
line wrap: on
line diff
--- a/channeldownloader.py Fri Mar 03 22:33:53 2023 +0000 +++ b/channeldownloader.py Fri Mar 03 22:51:28 2023 +0000 @@ -25,24 +25,14 @@ import os import re import time -try: - import urllib.request as compat_urllib - from urllib.error import HTTPError -except ImportError: # Python 2 - import urllib as compat_urllib - from urllib2 import HTTPError -try: - import yt_dlp as youtube_dl - from yt_dlp.utils import sanitize_filename, DownloadError -except ImportError: - try: - import youtube_dl - from youtube_dl.utils import sanitize_filename, DownloadError - except ImportError: - print("ERROR: youtube-dl/yt-dlp not installed!") - exit(1) -from io import open # for Python 2 compatibility, in Python 3 this - # just maps to the built-in function +import urllib.request +import requests # need this for ONE (1) exception +import yt_dlp as youtube_dl +from urllib.error import HTTPError +from yt_dlp.utils import sanitize_filename, DownloadError +from pathlib import Path +from requests.exceptions import ConnectTimeout + class MyLogger(object): def debug(self, msg): @@ -55,15 +45,19 @@ print(" " + msg) pass -def ytdl_hook(d): + +def ytdl_hook(d) -> None: if d["status"] == "finished": print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": - print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") + print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), + d["_percent_str"]), end="") if d["status"] == "error": - print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"]))) + print("\n an error occurred downloading %s!" + % (os.path.basename(d["filename"]))) -def load_split_files(path): + +def load_split_files(path: str) -> dict: if os.path.isdir(path): result = {"videos": []} for fi in os.listdir(path): @@ -75,16 +69,30 @@ else: return json.loads(open(path, "r", encoding="utf-8").read()) -def reporthook(count, block_size, total_size): + +def reporthook(count: int, block_size: int, total_size: int) -> None: global start_time if count == 0: start_time = time.time() return - duration = time.time() - start_time percent = int(count * block_size * 100 / total_size) print(" downloading %d%% \r" % (percent), end="") -args = docopt.docopt(__doc__) + +def write_metadata(i: dict, basename: str) -> None: + if not os.path.exists(basename + ".info.json"): + with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: + try: + jsonfile.write(json.dumps(i).decode("utf-8")) + except AttributeError: + jsonfile.write(json.dumps(i)) + print(" saved %s" % os.path.basename(jsonfile.name)) + if not os.path.exists(basename + ".description"): + with open(basename + ".description", "w", + encoding="utf-8") as descfile: + descfile.write(i["description"]) + print(" saved %s" % os.path.basename(descfile.name)) + ytdl_opts = { "retries": 100, @@ -97,7 +105,6 @@ "writeannotations": True, "writesubtitles": True, "allsubtitles": True, - "ignoreerrors": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, @@ -109,89 +116,117 @@ "ignoreerrors": False, } -if not os.path.exists(args["--output"]): - os.mkdir(args["--output"]) -for i in load_split_files(args["--database"])["videos"]: - uploader = i["uploader_id"] if "uploader_id" in i else None - for url in args["<url>"]: - channel = url.split("/")[-1] - - output = "%s/%s" % (args["--output"], channel) - if not os.path.exists(output): - os.mkdir(output) - ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s" - +def wayback_machine_dl(video: dict, basename: str) -> int: + try: + url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu", + "rl.archive.org/yt/%s"]) + headers = urllib.request.urlopen(url % video["id"]) + contenttype = headers.getheader("Content-Type") + if contenttype == "video/webm": + ext = "webm" + elif contenttype == "video/mp4": + ext = "mp4" + else: + raise HTTPError(url=None, code=None, msg=None, + hdrs=None, fp=None) + urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext), + reporthook) + print(" downloaded %s.%s" % (basename, ext)) + return 0 + except TimeoutError: + return 1 + except HTTPError: + print(" video not available on the Wayback Machine!") + return 0 + except Exception as e: + print(" unknown error downloading video!\n") + print(e) + return 0 - if uploader == channel: - print("%s:" % i["id"]) - # :skull: - # todo: put this in a function? - if any(x in os.listdir(output) for x in [sanitize_filename("%s-%s.mp4" % (i["title"], i["id"]), restricted=True), - sanitize_filename("%s-%s.mkv" % (i["title"], i["id"]), restricted=True), - sanitize_filename("%s-%s.webm" % (i["title"], i["id"]), restricted=True)]): - print(" video already downloaded!") +def internet_archive_dl(video: dict, basename: str) -> int: + if internetarchive.get_item("youtube-%s" % video["id"]).exists: + fnames = [f.name for f in internetarchive.get_files( + "youtube-%s" % video["id"])] + flist = [] + for fname in range(len(fnames)): + if re.search(''.join([r"((?:.+?-)?", video["id"], + r"\.(?:mp4|jpg|webp|mkv|webm|info\\.json|des" + r"cription|annotations.xml))"]), + fnames[fname]): + flist.append(fnames[fname]) + while True: + try: + internetarchive.download("youtube-%s" % video["id"], + files=flist, verbose=True, + destdir=output, + no_directory=True, + ignore_existing=True, + retries=9999) + break + except ConnectTimeout: continue - # this code is *really* ugly... todo a rewrite? - with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: - try: - result = ytdl.extract_info("https://youtube.com/watch?v=%s" % i["id"]) - continue - except DownloadError: - print(" video is not available! attempting to find Internet Archive pages of it...") - except Exception as e: - print(" unknown error downloading video!\n") - print(e) - if internetarchive.get_item("youtube-%s" % i["id"]).exists: # download from internetarchive if available - fnames = [f.name for f in internetarchive.get_files("youtube-%s" % i["id"])] - flist = [] - for fname in range(len(fnames)): - if re.search("((?:.+?-)?%s\.(?:mp4|jpg|webp|mkv|webm|info\.json|description))" % (i["id"]), fnames[fname]): - flist.append(fnames[fname]) - if len(flist) >= 1: - internetarchive.download("youtube-%s" % i["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True, retries=9999) - else: + except Exception: + return 0 + if flist[0][:len(video["id"])] == video["id"]: + for fname in flist: + if os.path.exists("%s/%s" % (output, fname)): + os.replace("%s/%s" % (output, fname), + "%s-%s" % (basename.rsplit("-", 1)[0], + fname)) + return 1 + return 0 + +def main(): + args = docopt.docopt(__doc__) + + if not os.path.exists(args["--output"]): + os.mkdir(args["--output"]) + + for i in load_split_files(args["--database"])["videos"]: + uploader = i["uploader_id"] if "uploader_id" in i else None + for url in args["<url>"]: + channel = url.split("/")[-1] + + output = "%s/%s" % (args["--output"], channel) + if not os.path.exists(output): + os.mkdir(output) + ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s" + + if uploader == channel: + print("%s:" % i["id"]) + basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], + restricted=True), i["id"]) + path = Path(output) + files = list(path.glob("*-%s.mkv" % i["id"])) + files.extend(list(path.glob("*-%s.mp4" % i["id"]))) + files.extend(list(path.glob("*-%s.webm" % i["id"]))) + if files: print(" video already downloaded!") + write_metadata(i, basename) continue - if os.path.exists("%s/%s.info.json" % (output, i["id"])): # will always exist no matter which setting was used to download - for fname in flist: - if os.path.exists(output + "/" + fname) and not os.path.exists(output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname): - os.rename(output + "/" + fname, output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname) - else: - print("ID file not found!") - else: - print(" video does not have a Internet Archive page! attempting to download from the Wayback Machine...") - try: # we could use yt-dlp's extractor, but then we would need to craft a fake wayback machine url, - # and we wouldn't even know if it worked. so let's continue using our little "hack" - headers = compat_urllib.urlopen("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"]) - if hasattr(headers.info(), "getheader"): - contenttype = headers.info().getheader("Content-Type") - else: - contenttype = headers.getheader("Content-Type") - if contenttype == "video/webm": - ext = "webm" - elif contenttype == "video/mp4": - ext = "mp4" - else: - raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None) - compat_urllib.urlretrieve("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"], "%s/%s-%s.%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"], ext), reporthook) - print(" downloaded %s-%s.%s" % (sanitize_filename(i["title"], restricted=True), i["id"], ext)) - except HTTPError: - print(" video not available on the Wayback Machine!") - except Exception as e: - print(" unknown error downloading video!\n") - print(e) - # metadata - basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"]) - if not os.path.exists(basename + ".info.json"): - with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: + # this code is *really* ugly... todo a rewrite? + with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: - jsonfile.write(json.dumps(i).decode("utf-8")) - except AttributeError: - jsonfile.write(json.dumps(i)) - print(" saved %s" % os.path.basename(jsonfile.name)) - if not os.path.exists(basename + ".description"): - with open(basename + ".description", "w", encoding="utf-8") as descfile: - descfile.write(i["description"]) - print(" saved %s" % os.path.basename(descfile.name)) + ytdl.extract_info("https://youtube.com/watch?v=%s" + % i["id"]) + continue + except DownloadError: + print(" video is not available! attempting to find In" + "ternet Archive pages of it...") + except Exception as e: + print(" unknown error downloading video!\n") + print(e) + if internet_archive_dl(i, basename) == 0: # if we can't download from IA + print(" video does not have a Internet Archive page! attem" + "pting to download from the Wayback Machine...") + while True: + if wayback_machine_dl(i, basename) == 0: # success + break + time.sleep(5) + continue + write_metadata(i, basename) + +if __name__ == "__main__": + main()