Mercurial > codedump
view channeldownloader.py @ 84:1eb7d6d7be1d
blog stuff
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Thu, 28 Jul 2022 16:50:25 -0400 |
parents | eafe13de3f76 |
children | 80bd4a99ea00 |
line wrap: on
line source
#!/usr/bin/env python3 # # download deleted vids from old yt channels # script by paper # it's pretty old and could definitely use some refining from __future__ import print_function import argparse import internetarchive try: import orjson as json except ImportError: import json import os import re import time try: import urllib.request as compat_urllib from urllib.error import HTTPError except ImportError: # Python 2 import urllib as compat_urllib from urllib2 import HTTPError try: import yt_dlp as youtube_dl from yt_dlp.utils import sanitize_filename except ImportError: try: import youtube_dl from youtube_dl.utils import sanitize_filename except ImportError: print("ERROR: youtube-dl/yt-dlp not installed!") exit(1) from io import open # for Python 2 compatibility, in Python 3 this # just maps to the built-in function class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): pass def ytdl_hook(d): if d["status"] == "finished": print(" downloaded %s: 100% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print(" an error occurred downloading {0}!") def load_split_files(path): if os.path.isdir(path): result = {"videos": []} for fi in os.listdir(path): for f in re.findall(r"vids.+?\.json", fi): with open(path + "/" + f, "r", encoding="utf-8") as infile: for i in json.loads(infile.read())["videos"]: result["videos"].append(i) return result else: return json.loads(open(path, "r", encoding="utf-8").read()) def reporthook(count, block_size, total_size): global start_time if count == 0: start_time = time.time() return duration = time.time() - start_time percent = int(count * block_size * 100 / total_size) print(" downloading %d%% \r" % (percent), end="") parser = argparse.ArgumentParser(description="Downloads (deleted) videos from YTPMV creators") parser.add_argument("-c", "--channel", help="channel URL", metavar="<url>", required=True) parser.add_argument("-d", "--database", help="json database (https://finnrepo.a2hosted.com/YTPMV_Database)", metavar="<path>", required=True) parser.add_argument("-o", "--output", help="output directory, defaults to the channel ID", metavar="<output>") args = parser.parse_args() if args.channel[:8] == "https://" or args.channel[:7] == "http://": channel = args.channel.split("/")[-1] else: channel = args.channel if args.output: output = args.output else: output = channel if not os.path.exists(output): os.mkdir(output) ytdl_opts = { "outtmpl": output + "/%(title)s-%(id)s.%(ext)s", "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "ignoreerrors": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, } for i in load_split_files(args.database)["videos"]: uploader = i["uploader_id"] if "uploader_id" in i else None if uploader == channel: print("%s:" % i["id"]) # :skull: # todo: put this in a function? if any(x in os.listdir(output) for x in [sanitize_filename(i["title"] + "-" + i["id"] + ".mp4", restricted=True), sanitize_filename(i["title"] + "-" + i["id"] + ".mkv", restricted=True), sanitize_filename(i["title"] + "-" + i["id"] + ".webm", restricted=True)]): print(" video already downloaded!") continue # this code is *really* ugly... todo a rewrite? with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: result = ytdl.download(["https://youtube.com/watch?v=%s" % i["id"]]) # TODO: add check for existing downloaded items and don't download them continue except Exception: print(" video is not available! attempting to find Internet Archive pages of it...") if internetarchive.get_item("youtube-%s" % i["id"]).exists: # download from internetarchive if available fnames = [f.name for f in internetarchive.get_files("youtube-%s" % i["id"])] flist = [] for fname in range(len(fnames)): if re.search("((?:.+?-)?%s\.(?:mp4|jpg|webp|mkv|webm|info\.json|description))" % (i["id"]), fnames[fname]): flist.append(fnames[fname]) if len(flist) >= 1: internetarchive.download("youtube-%s" % i["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True) else: print(" video already downloaded!") continue if os.path.exists(output + "/" + i["id"] + ".info.json"): # will always exist no matter which setting was used to download for fname in flist: if os.path.exists(output + "/" + fname) and not os.path.exists(output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname): os.rename(output + "/" + fname, output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname) else: print("ID file not found!") else: print(" video does not have a Internet Archive page! attempting to download from the Wayback Machine...") try: # we could use yt-dlp's extractor, but then we would need to craft a fake wayback machine url, # and we wouldn't even know if it worked. so let's continue using our little "hack" headers = compat_urllib.urlopen("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"]) if hasattr(headers.info(), "getheader"): contenttype = headers.info().getheader("Content-Type") else: contenttype = headers.getheader("Content-Type") if contenttype == "video/webm": ext = "webm" elif contenttype == "video/mp4": ext = "mp4" else: raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None) compat_urllib.urlretrieve("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"], "%s/%s-%s.%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"], ext), reporthook) print(" downloaded %s-%s.%s" % (sanitize_filename(i["title"], restricted=True), i["id"], ext)) except HTTPError: print(" video not available on the Wayback Machine!") except Exception as e: print(" unknown error downloading video!\n") print(e) # metadata with open("%s/%s-%s.info.json" % (output, sanitize_filename(i["title"], restricted=True), i["id"]), "w", encoding="utf-8") as jsonfile: jsonfile.write(json.dumps(i).decode("utf-8")) print(" saved %s" % os.path.basename(jsonfile.name))