comparison channeldownloader.py @ 14:03c8fd4069fb default tip

*: big refactor, switch to GPLv2, and add README Okay: now, we use a modular approach for downloaders. Each downloader is provided through a single function (which does the fetching). Additionally, the internetarchive library is optional now if the user does not want to install it. yt-dlp is still necessary though for it's sanitize_filename function. If and when I get to adding vanity features (such as finding the best possible source by comparing resolution and bitrate), I'll probably separate out all of the downloaders into different files. I also moved this project to a separate repository from 'codedump', keeping all of the relevant commit history :)
author Paper <paper@tflc.us>
date Sat, 30 Aug 2025 17:09:56 -0400
parents 2e7a3725ad21
children
comparison
equal deleted inserted replaced
13:2e7a3725ad21 14:03c8fd4069fb
1 #!/usr/bin/env python3 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # channeldownloader.py - scrapes youtube videos from a channel from
4 # a variety of sources
5
6 # Copyright (c) 2021-2025 Paper <paper@tflc.us>
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
2 """ 20 """
3 Usage: 21 Usage:
4 channeldownloader.py <url>... (--database <file>) 22 channeldownloader.py <url>... (--database <file>)
5 [--output <folder>] 23 [--output <folder>]
6 channeldownloader.py -h | --help 24 channeldownloader.py -h | --help
10 28
11 Options: 29 Options:
12 -h --help Show this screen 30 -h --help Show this screen
13 -o --output <folder> Output folder, relative to the current directory 31 -o --output <folder> Output folder, relative to the current directory
14 [default: .] 32 [default: .]
15 -d --database <file> YTPMV_Database compatible JSON file 33 -d --database <file> yt-dlp style database of videos. Should contain
34 an array of yt-dlp .info.json data. For example,
35 FinnOtaku's YTPMV metadata archive.
16 """ 36 """
37
38 # Built-in python stuff (no possible missing dependencies)
17 from __future__ import print_function 39 from __future__ import print_function
18 import docopt 40 import docopt
19 import internetarchive
20 try:
21 import orjson as json
22 except ImportError:
23 import json
24 import os 41 import os
25 import re 42 import re
26 import time 43 import time
27 import urllib.request 44 import urllib.request
28 import requests # need this for ONE (1) exception 45 import os
29 import yt_dlp as youtube_dl 46 import ssl
30 from urllib.error import HTTPError 47 from urllib.error import HTTPError
31 from yt_dlp.utils import sanitize_filename, DownloadError
32 from pathlib import Path 48 from pathlib import Path
33 from requests.exceptions import ConnectTimeout 49
34 50 # We can utilize special simdjson features if it is available
35 51 simdjson = False
36 class MyLogger(object): 52
37 def debug(self, msg): 53 try:
38 pass 54 import simdjson as json
39 55 simdjson = True
40 def warning(self, msg): 56 print("INFO: using simdjson")
41 pass 57 except ImportError:
42
43 def error(self, msg):
44 print(" " + msg)
45 pass
46
47
48 def ytdl_hook(d) -> None:
49 if d["status"] == "finished":
50 print(" downloaded %s: 100%% " % (os.path.basename(d["filename"])))
51 if d["status"] == "downloading":
52 print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
53 d["_percent_str"]), end="")
54 if d["status"] == "error":
55 print("\n an error occurred downloading %s!"
56 % (os.path.basename(d["filename"])))
57
58
59 def load_split_files(path: str):
60 if not os.path.isdir(path):
61 yield json.load(open(path, "r", encoding="utf-8"))
62 for fi in os.listdir(path):
63 if re.search(r"vids[0-9\-]+?\.json", fi):
64 with open(path + "/" + fi, "r", encoding="utf-8") as infile:
65 print(fi)
66 yield json.load(infile)
67
68
69 def reporthook(count: int, block_size: int, total_size: int) -> None:
70 global start_time
71 if count == 0:
72 start_time = time.time()
73 return
74 percent = int(count * block_size * 100 / total_size)
75 print(" downloading %d%% \r" % (percent), end="")
76
77
78 def write_metadata(i: dict, basename: str) -> None:
79 if not os.path.exists(basename + ".info.json"):
80 with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
81 try:
82 jsonfile.write(json.dumps(i).decode("utf-8"))
83 except AttributeError:
84 jsonfile.write(json.dumps(i))
85 print(" saved %s" % os.path.basename(jsonfile.name))
86 if not os.path.exists(basename + ".description"):
87 with open(basename + ".description", "w",
88 encoding="utf-8") as descfile:
89 descfile.write(i["description"])
90 print(" saved %s" % os.path.basename(descfile.name))
91
92
93 def wayback_machine_dl(video: dict, basename: str) -> int:
94 try: 58 try:
95 url = ''.join(["https://web.archive.org/web/2oe_/http://wayback-fakeu", 59 import ujson as json
96 "rl.archive.org/yt/%s"]) 60 print("INFO: using ujson")
97 headers = urllib.request.urlopen(url % video["id"]) 61 except ImportError:
98 contenttype = headers.getheader("Content-Type") 62 try:
99 if contenttype == "video/webm": 63 import orjson as json
100 ext = "webm" 64 print("INFO: using orjson")
101 elif contenttype == "video/mp4": 65 except ImportError:
102 ext = "mp4" 66 import json
67 print("INFO: using built-in json (slow!)")
68
69 ytdlp_works = False
70
71 try:
72 import yt_dlp as youtube_dl
73 from yt_dlp.utils import sanitize_filename, DownloadError
74 ytdlp_works = True
75 except ImportError:
76 print("failed to import yt-dlp!")
77 print("downloading from YouTube directly will not work.")
78
79 ia_works = False
80
81 try:
82 import internetarchive
83 from requests.exceptions import ConnectTimeout
84 ia_works = True
85 except ImportError:
86 print("failed to import the Internet Archive's python library!")
87 print("downloading from IA will not work.")
88
89 ##############################################################################
90 ## DOWNLOADERS
91
92 # All downloaders should be a function under this signature:
93 # dl(video: dict, basename: str, output: str) -> int
94 # where:
95 # 'video': the .info.json scraped from the YTPMV metadata archive.
96 # 'basename': the basename output to write as.
97 # 'output': the output directory.
98 # yes, it's weird, but I don't care ;)
99 #
100 # Magic return values:
101 # 0 -- all good, video is downloaded
102 # 1 -- error downloading video; it may still be available if we try again
103 # 2 -- video is proved totally unavailable here. give up
104
105
106 # Basic downloader template.
107 #
108 # This does a brute-force of all extensions within vexts and iexts
109 # in an attempt to find a working video link.
110 #
111 # linktemplate is a template to be created using the video ID and
112 # extension. For example:
113 # https://cdn.ytarchiver.com/%s.%s
114 def basic_dl_template(video: dict, basename: str, output: str,
115 linktemplate: str, vexts: list, iexts: list) -> int:
116 # actual downloader
117 def basic_dl_impl(vid: str, ext: str) -> int:
118 url = (linktemplate % (vid, ext))
119 try:
120 with urllib.request.urlopen(url) as headers:
121 with open("%s.%s" % (basename, ext), "wb") as f:
122 f.write(headers.read())
123 print(" downloaded %s.%s" % (basename, ext))
124 return 0
125 except TimeoutError:
126 return 1
127 except HTTPError:
128 return 2
129 except Exception as e:
130 print(" unknown error downloading video!")
131 print(e)
132 return 1
133
134 for exts in [vexts, iexts]:
135 for ext in exts:
136 r = basic_dl_impl(video["id"], ext)
137 if r == 0:
138 break # done!
139 elif r == 1:
140 # timeout; try again later?
141 return 1
142 elif r == 2:
143 continue
103 else: 144 else:
104 raise HTTPError(url=None, code=None, msg=None, 145 # we did not break out of the loop
105 hdrs=None, fp=None) 146 # which means all extensions were unavailable
106 urllib.request.urlretrieve(url % video["id"], "%s.%s" % (basename, ext), 147 return 2
107 reporthook) 148
149 # video was downloaded successfully
150 return 0
151
152
153 # GhostArchive, basic...
154 def ghostarchive_dl(video: dict, basename: str, output: str) -> int:
155 return basic_dl_template(video, basename, output,
156 "https://ghostvideo.b-cdn.net/chimurai/%s.%s",
157 ["mp4", "webm", "mkv"],
158 [] # none
159 )
160
161
162 # media.desirintoplaisir.net
163 #
164 # holds PRIMARILY popular videos (i.e. no niche internet microcelebrities)
165 # or weeb shit, however it seems to be growing to other stuff.
166 #
167 # there isn't really a proper API; I've based the scraping off of the HTML
168 # and the public source code.
169 def desirintoplaisir_dl(video: dict, basename: str, output: str) -> int:
170 return basic_dl_template(video, basename, output,
171 "https://media.desirintoplaisir.net/content/%s.%s",
172 ["mp4", "webm", "mkv"],
173 ["webp"]
174 )
175
176
177 # Internet Archive's Wayback Machine
178 #
179 # Internally, IA's javascript routines forward to the magic
180 # URL used here.
181 #
182 # TODO: Download thumbnails through the CDX API:
183 # https://github.com/TheTechRobo/youtubevideofinder/blob/master/lostmediafinder/finder.py
184 # the CDX API is pretty slow though, so it should be used as a last resort.
185 def wayback_dl(video: dict, basename: str, output: str) -> int:
186 try:
187 url = ("https://web.archive.org/web/2oe_/http://wayback-fakeurl.archiv"
188 "e.org/yt/%s" % video["id"])
189 with urllib.request.urlopen(url) as headers:
190 contenttype = headers.getheader("Content-Type")
191 if contenttype == "video/webm" or contenttype == "video/mp4":
192 ext = contenttype.split("/")[-1]
193 else:
194 raise HTTPError(url=None, code=None, msg=None,
195 hdrs=None, fp=None)
196 with open("%s.%s" % (basename, ext), "wb") as f:
197 f.write(headers.read())
108 print(" downloaded %s.%s" % (basename, ext)) 198 print(" downloaded %s.%s" % (basename, ext))
109 return 0 199 return 0
110 except TimeoutError: 200 except TimeoutError:
111 return 1 201 return 1
112 except HTTPError: 202 except HTTPError:
113 print(" video not available on the Wayback Machine!") 203 # dont keep trying
114 return 0 204 return 2
115 except Exception as e: 205 except Exception as e:
116 print(" unknown error downloading video!\n") 206 print(" unknown error downloading video!")
117 print(e) 207 print(e)
118 return 0
119
120
121 def ia_file_legit(path: str, vidid: str) -> bool:
122 return True if re.search(''.join([r"((?:.+?-)?", vidid, r"\.(?:mp4|jpg|web"
123 r"p|mkv|webm|info\\.json|description|annotations.xml"
124 "))"]),
125 path) else False
126
127
128 def internet_archive_dl(video: dict, basename: str, output: str) -> int:
129 if internetarchive.get_item("youtube-%s" % video["id"]).exists:
130 flist = [f.name for f in internetarchive.get_files("youtube-%s" % video["id"]) if ia_file_legit(f.name, video["id"])]
131 while True:
132 try:
133 internetarchive.download("youtube-%s" % video["id"],
134 files=flist, verbose=True,
135 destdir=output,
136 no_directory=True,
137 ignore_existing=True,
138 retries=9999)
139 break
140 except ConnectTimeout:
141 continue
142 except Exception as e:
143 print(e)
144 return 0
145 if flist[0][:len(video["id"])] == video["id"]:
146 for fname in flist:
147 if os.path.exists("%s/%s" % (output, fname)):
148 os.replace("%s/%s" % (output, fname),
149 "%s-%s" % (basename.rsplit("-", 1)[0],
150 fname))
151 return 1 208 return 1
209
210
211 # Internet Archive (tubeup)
212 def ia_dl(video: dict, basename: str, output: str) -> int:
213 def ia_file_legit(file: internetarchive.File, vidid: str) -> bool:
214 # FIXME:
215 #
216 # There are some items on IA that combine the old tubeup behavior
217 # (i.e., including the sanitized video name before the ID)
218 # and the new tubeup behavior (filename only contains the video ID)
219 # hence we will download the entire video twice.
220 #
221 # This isn't much of a problem anymore (and hasn't been for like 3
222 # years), since I contributed code to not upload something if there
223 # is already something there. However we should handle this case
224 # anyway.
225 #
226 # Additionally, there are some items that have duplicate video files
227 # (from when the owners changed the title). We should ideally only
228 # download unique files. IA seems to provide SHA1 hashes...
229 #
230 # We should also check if whether the copy on IA is higher quality
231 # than a local copy... :)
232 if not re.search(r"((?:.+?-)?" + vidid + r"\.(?:mp4|jpg|webp|mkv|w"
233 r"ebm|info\.json|description|annotations.xml))",
234 f.name):
235 return False
236
237 # now, check the metadata
238 print(f)
239 return True
240
241
242 if not internetarchive.get_item("youtube-%s" % video["id"]).exists:
243 return 2
244
245 flist = [
246 f.name
247 for f in internetarchive.get_files("youtube-%s" % video["id"])
248 if ia_file_legit(f.name, video["id"])
249 ]
250 while True:
251 try:
252 internetarchive.download("youtube-%s" % video["id"], files=flist,
253 verbose=True, destdir=output,
254 no_directory=True, ignore_existing=True,
255 retries=9999)
256 break
257 except ConnectTimeout:
258 time.sleep(1)
259 continue
260 except Exception as e:
261 print(e)
262 return 1
263
264 # Newer versions of tubeup save only the video ID.
265 # Account for this by replacing it.
266 #
267 # paper/2025-08-30: fixed a bug where video IDs with hyphens
268 # would incorrectly truncate
269 for fname in flist:
270 # ignore any files whose names are not simply the ID
271 if os.path.splitext(fname)[0] != video["id"]:
272 continue
273
274 if os.path.exists("%s/%s" % (output, fname)):
275 os.replace("%s/%s" % (output, fname),
276 "%s.%s" % (basename, os.path.splitext(fname))[1])
152 return 0 277 return 0
153 278
154 279
155 ytdl_opts = { 280 def ytdlp_dl(video: dict, basename: str, output: str) -> int:
156 "retries": 100, 281 # intentionally ignores all messages besides errors
157 "nooverwrites": True, 282 class MyLogger(object):
158 "call_home": False, 283 def debug(self, msg):
159 "quiet": True, 284 pass
160 "writeinfojson": True, 285
161 "writedescription": True, 286 def warning(self, msg):
162 "writethumbnail": True, 287 pass
163 "writeannotations": True, 288
164 "writesubtitles": True, 289 def error(self, msg):
165 "allsubtitles": True, 290 print(" " + msg)
166 "addmetadata": True, 291 pass
167 "continuedl": True, 292
168 "embedthumbnail": True, 293
169 "format": "bestvideo+bestaudio/best", 294 def ytdl_hook(d) -> None:
170 "restrictfilenames": True, 295 if d["status"] == "finished":
171 "no_warnings": True, 296 print(" downloaded %s: 100%% " % (os.path.basename(d["filename"])))
172 "progress_hooks": [ytdl_hook], 297 if d["status"] == "downloading":
173 "logger": MyLogger(), 298 print(" downloading %s: %s\r" % (os.path.basename(d["filename"]),
174 "ignoreerrors": False, 299 d["_percent_str"]), end="")
175 } 300 if d["status"] == "error":
301 print("\n an error occurred downloading %s!"
302 % (os.path.basename(d["filename"])))
303
304 ytdl_opts = {
305 "retries": 100,
306 "nooverwrites": True,
307 "call_home": False,
308 "quiet": True,
309 "writeinfojson": True,
310 "writedescription": True,
311 "writethumbnail": True,
312 "writeannotations": True,
313 "writesubtitles": True,
314 "allsubtitles": True,
315 "addmetadata": True,
316 "continuedl": True,
317 "embedthumbnail": True,
318 "format": "bestvideo+bestaudio/best",
319 "restrictfilenames": True,
320 "no_warnings": True,
321 "progress_hooks": [ytdl_hook],
322 "logger": MyLogger(),
323 "ignoreerrors": False,
324
325 #mm, output template
326 "outtmpl": output + "/%(title)s-%(id)s.%(ext)s",
327 }
328
329 with youtube_dl.YoutubeDL(ytdl_opts) as ytdl:
330 try:
331 ytdl.extract_info("https://youtube.com/watch?v=%s" % video["id"])
332 return 0
333 except DownloadError:
334 return 2
335 except Exception as e:
336 print(" unknown error downloading video!\n")
337 print(e)
338
339 return 1
340
341
342 # TODO: There are multiple other youtube archival websites available.
343 # Most notable is https://findyoutubevideo.thetechrobo.ca .
344 # This combines a lot of sparse youtube archival services, and has
345 # a convenient API we can use. Nice!
346 #
347 # There is also the "Distributed YouTube Archive" which is totally
348 # useless because there's way to automate it...
349
350 ##############################################################################
176 351
177 352
178 def main(): 353 def main():
354 # generator; creates a list of files, and returns the parsed form of
355 # each. note that the parser is not necessarily
356 def load_split_files(path: str):
357 list_files = []
358
359 # build the path list
360 if not os.path.isdir(path):
361 list_files.append(path)
362 else:
363 for fi in os.listdir(path):
364 if re.search(r"vids[0-9\-]+?\.json", fi):
365 list_files.append(path + "/" + fi)
366
367 # now open each as a json
368 for fi in list_files:
369 print(fi)
370 with open(fi, "r", encoding="utf-8") as infile:
371 if simdjson:
372 # Using this is a lot faster in SIMDJSON, since instead
373 # of converting all of the JSON key/value pairs into
374 # native Python objects, they stay in an internal state.
375 #
376 # This means we only get the stuff we absolutely need,
377 # which is the uploader ID, and copy everything else
378 # if the ID is one we are looking for.
379 parser = json.Parser()
380 yield parser.parse(infile.read())
381 del parser
382 else:
383 yield json.load(infile)
384
385
386 def write_metadata(i: dict, basename: str) -> None:
387 # ehhh
388 if not os.path.exists(basename + ".info.json"):
389 with open(basename + ".info.json", "w", encoding="utf-8") as jsonfile:
390 try:
391 # orjson outputs bytes
392 jsonfile.write(json.dumps(i).decode("utf-8"))
393 except AttributeError:
394 # everything else outputs a string
395 jsonfile.write(json.dumps(i))
396 print(" saved %s" % os.path.basename(jsonfile.name))
397 if not os.path.exists(basename + ".description"):
398 with open(basename + ".description", "w",
399 encoding="utf-8") as descfile:
400 descfile.write(i["description"])
401 print(" saved %s" % os.path.basename(descfile.name))
402
179 args = docopt.docopt(__doc__) 403 args = docopt.docopt(__doc__)
180 404
181 if not os.path.exists(args["--output"]): 405 if not os.path.exists(args["--output"]):
182 os.mkdir(args["--output"]) 406 os.mkdir(args["--output"])
183 407
184 for f in load_split_files(args["--database"]): 408 channels = dict()
185 for i in f: 409
186 uploader = i["uploader_id"] if "uploader_id" in i else None 410 for url in args["<url>"]:
187 for url in args["<url>"]: 411 chn = url.split("/")[-1]
188 channel = url.split("/")[-1] 412 channels[chn] = {"output": "%s/%s" % (args["--output"], chn)}
189 413
190 output = "%s/%s" % (args["--output"], channel) 414 for channel in channels.values():
191 if not os.path.exists(output): 415 if not os.path.exists(channel["output"]):
192 os.mkdir(output) 416 os.mkdir(channel["output"])
193 ytdl_opts["outtmpl"] = output + "/%(title)s-%(id)s.%(ext)s" 417
194 418 # find videos in the database.
195 if uploader == channel: 419 #
196 print(uploader, channel) 420 # despite how it may seem, this is actually really fast, and fairly
197 print("%s:" % i["id"]) 421 # memory efficient too (but really only if we're using simdjson...)
198 basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], 422 videos = [
199 restricted=True), i["id"]) 423 i if not simdjson else i.as_dict()
200 files = [y for p in ["mkv", "mp4", "webm"] for y in list(Path(output).glob(("*-%s." + p) % i["id"]))] 424 for f in load_split_files(args["--database"])
201 if files: 425 for i in (f if not "videos" in f else f["videos"]) # logic is reversed kinda, python is weird
202 print(" video already downloaded!") 426 if "uploader_id" in i and i["uploader_id"] in channels
203 write_metadata(i, basename) 427 ]
428
429 while True:
430 if len(videos) == 0:
431 break
432
433 videos_copy = videos
434
435 for i in videos_copy:
436 channel = channels[i["uploader_id"]]
437
438 # precalculated for speed
439 output = channel["output"]
440
441 print("%s:" % i["id"])
442 basename = "%s/%s-%s" % (output, sanitize_filename(i["title"],
443 restricted=True), i["id"])
444 files = [y
445 for p in ["mkv", "mp4", "webm"]
446 for y in Path(output).glob(("*-%s." + p) % i["id"])]
447 if files:
448 print(" video already downloaded!")
449 videos.remove(i)
450 write_metadata(i, basename)
451 continue
452
453 # high level "download" function.
454 def dl(video: dict, basename: str, output: str):
455 dls = []
456
457 if ytdlp_works:
458 dls.append({
459 "func": ytdlp_dl,
460 "name": "using yt-dlp",
461 })
462
463 if ia_works:
464 dls.append({
465 "func": ia_dl,
466 "name": "from the Internet Archive",
467 })
468
469 dls.append({
470 "func": desirintoplaisir_dl,
471 "name": "from LMIJLM/DJ Plaisir's archive",
472 })
473 dls.append({
474 "func": ghostarchive_dl,
475 "name": "from GhostArchive"
476 })
477 dls.append({
478 "func": wayback_dl,
479 "name": "from the Wayback Machine"
480 })
481
482 for dl in dls:
483 print(" attempting to download %s" % dl["name"])
484 r = dl["func"](i, basename, output)
485 if r == 0:
486 # all good, video's downloaded
487 return 0
488 elif r == 2:
489 # video is unavailable here
490 print(" oops, video is not available there...")
204 continue 491 continue
205 # this code is *really* ugly... todo a rewrite? 492 elif r == 1:
206 with youtube_dl.YoutubeDL(ytdl_opts) as ytdl: 493 # error while downloading; likely temporary.
207 try: 494 # TODO we should save which downloader the video
208 ytdl.extract_info("https://youtube.com/watch?v=%s" 495 # was on, so we can continue back at it later.
209 % i["id"]) 496 return 1
210 continue 497 # video is unavailable everywhere
211 except DownloadError: 498 return 2
212 print(" video is not available! attempting to find In" 499
213 "ternet Archive pages of it...") 500 r = dl(i, basename, output)
214 except Exception as e: 501 if r == 1:
215 print(" unknown error downloading video!\n") 502 continue
216 print(e) 503
217 if internet_archive_dl(i, basename, output): # if we can't download from IA 504 # video is downloaded, or it's totally unavailable, so
218 continue 505 # remove it from being checked again.
219 print(" video does not have a Internet Archive page! attem" 506 videos.remove(i)
220 "pting to download from the Wayback Machine...") 507 # ... and then dump the metadata, if there isn't any on disk.
221 while True: 508 write_metadata(i, basename)
222 if wayback_machine_dl(i, basename) == 0: # success 509
223 break 510 if r == 0:
224 time.sleep(5) 511 # video is downloaded
225 continue 512 continue
226 write_metadata(i, basename) 513
514 # video is unavailable; write out the metadata.
515 print(" video is unavailable everywhere; dumping out metadata only")
227 516
228 517
229 if __name__ == "__main__": 518 if __name__ == "__main__":
230 main() 519 main()