Mercurial > codedump
view channeldownloader.py @ 117:40a7b6d9bd3b
officially deprecate kemonopartydownloader.py
committer: GitHub <noreply@github.com>
author | Paper <37962225+mrpapersonic@users.noreply.github.com> |
---|---|
date | Fri, 03 Mar 2023 22:33:53 +0000 |
parents | 80bd4a99ea00 |
children | eac6dae753ca |
line wrap: on
line source
#!/usr/bin/env python3 """ Usage: channeldownloader.py <url>... (--database <file>) [--output <folder>] [--proxy <proxy>] channeldownloader.py -h | --help Arguments: <url> YouTube channel URL to download from Options: -h --help Show this screen -o --output <folder> Output folder, relative to the current directory [default: .] -d --database <file> HTTP or HTTPS proxy (SOCKS5 with PySocks) """ from __future__ import print_function import docopt import internetarchive try: import orjson as json except ImportError: import json import os import re import time try: import urllib.request as compat_urllib from urllib.error import HTTPError except ImportError: # Python 2 import urllib as compat_urllib from urllib2 import HTTPError try: import yt_dlp as youtube_dl from yt_dlp.utils import sanitize_filename, DownloadError except ImportError: try: import youtube_dl from youtube_dl.utils import sanitize_filename, DownloadError except ImportError: print("ERROR: youtube-dl/yt-dlp not installed!") exit(1) from io import open # for Python 2 compatibility, in Python 3 this # just maps to the built-in function class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(" " + msg) pass def ytdl_hook(d): if d["status"] == "finished": print(" downloaded %s: 100%% " % (os.path.basename(d["filename"]))) if d["status"] == "downloading": print(" downloading %s: %s\r" % (os.path.basename(d["filename"]), d["_percent_str"]), end="") if d["status"] == "error": print("\n an error occurred downloading %s!" % (os.path.basename(d["filename"]))) def load_split_files(path): if os.path.isdir(path): result = {"videos": []} for fi in os.listdir(path): for f in re.findall(r"vids[0-9\-]+?\.json", fi): with open(path + "/" + f, "r", encoding="utf-8") as infile: jsonnn = json.loads(infile.read()) result["videos"].extend(jsonnn) return result else: return json.loads(open(path, "r", encoding="utf-8").read()) def reporthook(count, block_size, total_size): global start_time if count == 0: start_time = time.time() return duration = time.time() - start_time percent = int(count * block_size * 100 / total_size) print(" downloading %d%% \r" % (percent), end="") args = docopt.docopt(__doc__) ytdl_opts = { "retries": 100, "nooverwrites": True, "call_home": False, "quiet": True, "writeinfojson": True, "writedescription": True, "writethumbnail": True, "writeannotations": True, "writesubtitles": True, "allsubtitles": True, "ignoreerrors": True, "addmetadata": True, "continuedl": True, "embedthumbnail": True, "format": "bestvideo+bestaudio/best", "restrictfilenames": True, "no_warnings": True, "progress_hooks": [ytdl_hook], "logger": MyLogger(), "ignoreerrors": False, } if not os.path.exists(args["--output"]): os.mkdir(args["--output"]) for i in load_split_files(args["--database"])["videos"]: uploader = i["uploader_id"] if "uploader_id" in i else None for url in args["<url>"]: channel = url.split("/")[-1] output = "%s/%s" % (args["--output"], channel) if not os.path.exists(output): os.mkdir(output) ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s" if uploader == channel: print("%s:" % i["id"]) # :skull: # todo: put this in a function? if any(x in os.listdir(output) for x in [sanitize_filename("%s-%s.mp4" % (i["title"], i["id"]), restricted=True), sanitize_filename("%s-%s.mkv" % (i["title"], i["id"]), restricted=True), sanitize_filename("%s-%s.webm" % (i["title"], i["id"]), restricted=True)]): print(" video already downloaded!") continue # this code is *really* ugly... todo a rewrite? with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: try: result = ytdl.extract_info("https://youtube.com/watch?v=%s" % i["id"]) continue except DownloadError: print(" video is not available! attempting to find Internet Archive pages of it...") except Exception as e: print(" unknown error downloading video!\n") print(e) if internetarchive.get_item("youtube-%s" % i["id"]).exists: # download from internetarchive if available fnames = [f.name for f in internetarchive.get_files("youtube-%s" % i["id"])] flist = [] for fname in range(len(fnames)): if re.search("((?:.+?-)?%s\.(?:mp4|jpg|webp|mkv|webm|info\.json|description))" % (i["id"]), fnames[fname]): flist.append(fnames[fname]) if len(flist) >= 1: internetarchive.download("youtube-%s" % i["id"], files=flist, verbose=True, destdir=output, no_directory=True, ignore_existing=True, retries=9999) else: print(" video already downloaded!") continue if os.path.exists("%s/%s.info.json" % (output, i["id"])): # will always exist no matter which setting was used to download for fname in flist: if os.path.exists(output + "/" + fname) and not os.path.exists(output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname): os.rename(output + "/" + fname, output + "/" + sanitize_filename(i["title"], restricted=True) + "-" + fname) else: print("ID file not found!") else: print(" video does not have a Internet Archive page! attempting to download from the Wayback Machine...") try: # we could use yt-dlp's extractor, but then we would need to craft a fake wayback machine url, # and we wouldn't even know if it worked. so let's continue using our little "hack" headers = compat_urllib.urlopen("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"]) if hasattr(headers.info(), "getheader"): contenttype = headers.info().getheader("Content-Type") else: contenttype = headers.getheader("Content-Type") if contenttype == "video/webm": ext = "webm" elif contenttype == "video/mp4": ext = "mp4" else: raise HTTPError(url=None, code=None, msg=None, hdrs=None, fp=None) compat_urllib.urlretrieve("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archive.org/yt/%s" % i["id"], "%s/%s-%s.%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"], ext), reporthook) print(" downloaded %s-%s.%s" % (sanitize_filename(i["title"], restricted=True), i["id"], ext)) except HTTPError: print(" video not available on the Wayback Machine!") except Exception as e: print(" unknown error downloading video!\n") print(e) # metadata basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], restricted=True), i["id"]) if not os.path.exists(basename + ".info.json"): with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile: try: jsonfile.write(json.dumps(i).decode("utf-8")) except AttributeError: jsonfile.write(json.dumps(i)) print(" saved %s" % os.path.basename(jsonfile.name)) if not os.path.exists(basename + ".description"): with open(basename + ".description", "w", encoding="utf-8") as descfile: descfile.write(i["description"]) print(" saved %s" % os.path.basename(descfile.name))