Mercurial > codedump
view channeldownloader.py @ 64:1508aee998df
Add Windows 95 Keygen GUI
author | Paper <mrpapersonic@gmail.com> |
---|---|
date | Mon, 25 Apr 2022 02:10:41 -0400 |
parents | c615532e6572 |
children | 9636d5dee08c |
line wrap: on
line source
import argparse import internetarchive # pip install internetarchive import json import glob import os import re import urllib.request import yt_dlp # pip install yt-dlp import itertools from urllib.error import HTTPError from yt_dlp.utils import sanitize_filename class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): pass def matroska_find(filelist): for myfile in filelist: if os.path.splitext(myfile)[1] == ".mkv" or os.path.splitext(myfile)[1] == ".webm": return True return False def ytdl_hook(d): if d["status"] == "finished": print(" downloaded {0}: 100% ".format(os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading {0}: {1}\r".format(os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print(" an error occurred downloading {0}!") def load_split_files(path): if os.path.isdir(path): result = {"videos": []} for f in glob.glob(os.path.join(path, "vids*.json")): with open(f, "r", encoding="utf-8") as infile: for i in json.loads(infile.read())["videos"]: result["videos"].append(i) return result else: return json.loads(open(path, "r", encoding="utf-8")) parser = argparse.ArgumentParser(description="Downloads (deleted) videos from YTPMV creators") parser.add_argument("-c", "--channel", help="channel URL", metavar='<url>', required=True) parser.add_argument("-d", "--database", help="json database (https://finnrepo.a2hosted.com/YTPMV_Database)", metavar='<path>', required=True) parser.add_argument("-o", "--output", help="output directory, defaults to the channel ID", metavar='<output>') args = parser.parse_args() if args.channel[:8] == "https://" or args.channel[:7] == "http://": channel = args.channel.split("/")[-1] else: channel = args.channel if args.output: output = args.output else: output = channel if not os.path.exists(output): os.mkdir(output) ytdl_opts = { "outtmpl": "{0}/%(title)s-%(id)s.%(ext)s".format(output), "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "ignoreerrors": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, } for i in load_split_files(args.database): try: uploader = i["uploader_id"] except Exception: uploader = "unknown" if uploader == channel: print("{0}:".format(i["id"])) isalreadydownloaded = 0 for file in os.listdir(output): if os.path.splitext(file)[1] == ".json": if file.find("-" + i["id"] + ".info.json") != -1: isalreadydownloaded = 1 if isalreadydownloaded == 1: # not sure how to bypass this without having to go out of the for loop, if anyone could tell me how that would be great! print(" video already downloaded!") continue with yt_dlp.YoutubeDL(ytdl_opts) as ytdl: try: result = ytdl.download(["https://youtube.com/watch?v={0}".format(i["id"])]) # TODO: add check for existing downloaded items and don't download them continue except Exception: print(" video is not available! attempting to find Internet Archive pages of it...") if internetarchive.get_item("youtube-{0}".format(i["id"])).exists: # download from internetarchive if available fnames = [f.name for f in internetarchive.get_files("youtube-{0}".format(i["id"]))] disallowednames = ["__ia_thumb.jpg", "youtube-{0}_archive.torrent".format(i["id"]), "youtube-{0}_files.xml".format(i["id"]), "youtube-{0}_meta.sqlite".format(i["id"]), "youtube-{0}_meta.xml".format(i["id"])] # list of IA-created files we don't need flist = [] for fname in fnames: if matroska_find(fnames): if fname[-4:] == ".mp4": continue else: if fname[-7:] == ".ia.mp4": continue if fname.find("/") == -1: if fname not in disallowednames and fname[-21:] != "{0}_thumb.jpg".format(i["id"]) and fname[-15:] != "{0}.ogv".format(i["id"]): flist.append(fname) if len(flist) >= 1: internetarchive.download("youtube-{0}".format(i["id"]), files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True) else: print(" video already downloaded!") continue if os.path.exists(output + "\\" + i["id"] + ".info.json"): # will always exist no matter which setting was used to download for fname in flist: if os.path.exists(output + "\\" + fname) and not os.path.exists(output + "\\" + sanitize_filename(i["title"], restricted=True) + "-" + fname): os.rename(output + "\\" + fname, output + "\\" + sanitize_filename(i["title"], restricted=True) + "-" + fname) else: print("ID file not found!") else: # download the vid from waybackmachine (NOTE: only tested with youtube links after polymer, however SHOULD work with links created before then) print(" video does not have a Internet Archive page! attempting to download from the Wayback Machine...") try: contenttype = urllib.request.urlopen("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/{0}".format(i["id"])).getheader("Content-Type") if contenttype == "video/webm": ext = "webm" else: ext = "mp4" urllib.request.urlretrieve("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/{0}".format(i["id"]), "{3}\\{0}-{1}.{2}".format(sanitize_filename(i["title"], restricted=True), i["id"], ext, output)) print(" downloaded {0}-{1}.{2}".format(sanitize_filename(i["title"], restricted=True), i["id"], ext)) except HTTPError: print(" video not available on the Wayback Machine!") except Exception as e: print(" unknown error downloading video!") print(e) # metadata with open("{2}\\{0}-{1}.info.json".format(sanitize_filename(i["title"], restricted=True), i["id"], output), "w") as jsonfile: print(json.dumps(i), end="", file=jsonfile) print(" saved {0}-{1}.info.json".format(sanitize_filename(i["title"], restricted=True), i["id"], output))