Mercurial > codedump
view channeldownloader.py @ 124:c29589c45d4a
Add systemd timer for kmbscreens bot
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Sun, 23 Apr 2023 16:36:29 -0400 |
parents | 8ec0e91a5dcf |
children |
line wrap: on
line source
#!/usr/bin/env python3 """ Usage: channeldownloader.py <url>... (--database <file>) [--output <folder>] channeldownloader.py -h | --help Arguments: <url> YouTube channel URL to download from Options: -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] -d --database <file> YTPMV_Database compatible JSON file """ from __future__ import print_function import docopt import internetarchive try: import orjson as json except ImportError: import json import os import re import time import urllib.request import requests # need this for ONE (1) exception import yt_dlp as youtube_dl from urllib.error import HTTPError from yt_dlp.utils import sanitize_filename, DownloadError from pathlib import Path from requests.exceptions import ConnectTimeout class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(" " + msg) pass def ytdl_hook(d) -> None: if d["status"] == "finished": print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"]))) def load_split_files(path: str): if not os.path.isdir(path): yield json.load(open(path, "r", encoding="utf-8")) for fi in os.listdir(path): if re.search(r"vids[0-9\-]+?\.json", fi): with open(path + "/" + fi, "r", encoding="utf-8") as infile: print(fi) yield json.load(infile) def reporthook(count: int, block_size: int, total_size: int) -> None: global start_time if count == 0: start_time = time.time() return percent = int(count * block_size * 100 / total_size) print(" downloading %d%% \r" % (percent), end="") def write_metadata(i: dict, basename: str) -> None: if not os.path.exists(basename + ".info.json"): with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: try: jsonfile.write(json.dumps(i).decode("utf-8")) except AttributeError: jsonfile.write(json.dumps(i)) print(" saved %s" % os.path.basename(jsonfile.name)) if not os.path.exists(basename + ".description"): with open(basename + ".description", "w", encoding="utf-8") as descfile: descfile.write(i["description"]) print(" saved %s" % os.path.basename(descfile.name)) def wayback_machine_dl(video: dict, basename: str) -> int: try: url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu", "rl.archive.org/yt/%s"]) headers = urllib.request.urlopen(url % video["id"]) contenttype = headers.getheader("Content-Type") if contenttype == "video/webm": ext = "webm" elif contenttype == "video/mp4": ext = "mp4" else: raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None) urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext), reporthook) print(" downloaded %s.%s" % (basename, ext)) return 0 except TimeoutError: return 1 except HTTPError: print(" video not available on the Wayback Machine!") return 0 except Exception as e: print(" unknown error downloading video!\n") print(e) return 0 def ia_file_legit(path: str, vidid: str) -> bool: return True if re.search(''.join([r"((?:.+?-)?", vidid, r"\.(?:mp4|jpg|web" r"p|mkv|webm|info\\.json|description|annotations.xml" "))"]), path) else False def internet_archive_dl(video: dict, basename: str, output: str) -> int: if internetarchive.get_item("youtube-%s" % video["id"]).exists: flist = [f.name for f in internetarchive.get_files("youtube-%s" % video["id"]) if ia_file_legit(f.name, video["id"])] while True: try: internetarchive.download("youtube-%s" % video["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True, retries=9999) break except ConnectTimeout: continue except Exception as e: print(e) return 0 if flist[0][:len(video["id"])] == video["id"]: for fname in flist: if os.path.exists("%s/%s" % (output, fname)): os.replace("%s/%s" % (output, fname), "%s-%s" % (basename.rsplit("-", 1)[0], fname)) return 1 return 0 ytdl_opts = { "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, } def main(): args = docopt.docopt(__doc__) if not os.path.exists(args["--output"]): os.mkdir(args["--output"]) for f in load_split_files(args["--database"]): for i in f: uploader = i["uploader_id"] if "uploader_id" in i else None for url in args["<url>"]: channel = url.split("/")[-1] output = "%s/%s" % (args["--output"], channel) if not os.path.exists(output): os.mkdir(output) ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s" if uploader == channel: print(uploader, channel) print("%s:" % i["id"]) basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"]) files = [y for p in ["mkv", "mp4", "webm"] for y in list(Path(output).glob(("*-%s." + p) % i["id"]))] if files: print(" video already downloaded!") write_metadata(i, basename) continue # this code is *really* ugly... todo a rewrite? with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: ytdl.extract_info("https://youtube.com/watch?v=%s" % i["id"]) continue except DownloadError: print(" video is not available! attempting to find In" "ternet Archive pages of it...") except Exception as e: print(" unknown error downloading video!\n") print(e) if internet_archive_dl(i, basename, output): # if we can't download from IA continue print(" video does not have a Internet Archive page! attem" "pting to download from the Wayback Machine...") while True: if wayback_machine_dl(i, basename) == 0: # success break time.sleep(5) continue write_metadata(i, basename) if __name__ == "__main__": main()