Mercurial > channeldownloader
comparison channeldownloader.py @ 16:088d9a3a2524
improvements to IA downloader
now we explicitly ignore any file not "original". this seems to
filter out derivative files (such as ogv and other shit we don't
want) but keeps some of the toplevel metadata
| author | Paper <paper@tflc.us> |
|---|---|
| date | Sat, 28 Feb 2026 14:38:04 -0500 |
| parents | 615e1ca0212a |
| children | 0d10b2ce0140 |
comparison
equal
deleted
inserted
replaced
| 15:615e1ca0212a | 16:088d9a3a2524 |
|---|---|
| 43 import time | 43 import time |
| 44 import urllib.request | 44 import urllib.request |
| 45 import os | 45 import os |
| 46 import ssl | 46 import ssl |
| 47 import io | 47 import io |
| 48 import shutil | |
| 49 import xml.etree.ElementTree as XmlET | |
| 48 from urllib.error import HTTPError | 50 from urllib.error import HTTPError |
| 49 from pathlib import Path | 51 from pathlib import Path |
| 50 | 52 |
| 51 # We can utilize special simdjson features if it is available | 53 # We can utilize special simdjson features if it is available |
| 52 simdjson = False | 54 simdjson = False |
| 216 print(" unknown error downloading video!") | 218 print(" unknown error downloading video!") |
| 217 print(e) | 219 print(e) |
| 218 return 1 | 220 return 1 |
| 219 | 221 |
| 220 | 222 |
| 223 # Also captures the ID for comparison | |
| 224 IA_REGEX = re.compile(r"(?:(?P<date>\d{8}) - )?(?P<title>.+?)?(?:-| \[)?(?:(?P<id>[A-z0-9_\-]{11})]?|(?: \((?P<format>(?:(?:(?P<resolution>\d+)p_(?P<fps>\d+)fps_(?P<vcodec>H264)-)?(?P<abitrate>\d+)kbit_(?P<acodec>AAC|Vorbis))|BQ|Description)\)))\.(?P<extension>mp4|info\.json|description|annotations\.xml|webp|mkv|webm|jpg|jpeg|ogg|txt|m4a)$") | |
| 225 | |
| 226 | |
| 221 # Internet Archive (tubeup) | 227 # Internet Archive (tubeup) |
| 222 def ia_dl(video: dict, basename: str, output: str) -> int: | 228 def ia_dl(video: dict, basename: str, output: str) -> int: |
| 223 def ia_file_legit(f: str, vidid: str) -> bool: | 229 def ia_file_legit(f: str, vidid: str, vidtitle: str) -> bool: |
| 224 # FIXME: | 230 # FIXME: |
| 225 # | 231 # |
| 226 # There are some items on IA that combine the old tubeup behavior | 232 # There are some items on IA that combine the old tubeup behavior |
| 227 # (i.e., including the sanitized video name before the ID) | 233 # (i.e., including the sanitized video name before the ID) |
| 228 # and the new tubeup behavior (filename only contains the video ID) | 234 # and the new tubeup behavior (filename only contains the video ID) |
| 237 # (from when the owners changed the title). We should ideally only | 243 # (from when the owners changed the title). We should ideally only |
| 238 # download unique files. IA seems to provide SHA1 hashes... | 244 # download unique files. IA seems to provide SHA1 hashes... |
| 239 # | 245 # |
| 240 # We should also check if whether the copy on IA is higher quality | 246 # We should also check if whether the copy on IA is higher quality |
| 241 # than a local copy... :) | 247 # than a local copy... :) |
| 242 if not re.search(r"((?:.+?-)?" + vidid + r"\.(?:mp4|jpg|webp|mkv|w" | 248 |
| 243 r"ebm|info\.json|description|annotations.xml))", | 249 IA_ID = "youtube-%s" % vidid |
| 244 f): | 250 |
| 251 # Ignore IA generated thumbnails | |
| 252 if f.startswith("%s.thumbs/" % IA_ID) or f == "__ia_thumb.jpg": | |
| 245 return False | 253 return False |
| 246 | 254 |
| 247 return True | 255 for i in ["_archive.torrent", "_files.xml", "_meta.sqlite", "_meta.xml"]: |
| 248 | 256 if f == (IA_ID + i): |
| 249 | 257 return False |
| 250 if not internetarchive.get_item("youtube-%s" % video["id"]).exists: | 258 |
| 259 # Try to match with our known filename regex | |
| 260 # This properly matches: | |
| 261 # ??????????? - YYYYMMDD - TITLE [ID].EXTENSION | |
| 262 # old tubeup - TITLE-ID.EXTENSION | |
| 263 # tubeup - ID.EXTENSION | |
| 264 # JDownloader - TITLE (FORMAT).EXTENSION | |
| 265 # (Possibly we should match other filenames too??) | |
| 266 m = re.match(IA_REGEX, f) | |
| 267 if m is None: | |
| 268 return False | |
| 269 | |
| 270 if m.group("id"): | |
| 271 return (m.group("id") == vidid) | |
| 272 elif m.group("title") is not None: | |
| 273 def asciify(s: str) -> str: | |
| 274 # Replace all non-ASCII chars with underscores, and get rid of any whitespace | |
| 275 return ''.join([i if ord(i) >= 0x20 and ord(i) < 0x80 and i not in "/\\" else '_' for i in s]).strip() | |
| 276 | |
| 277 if asciify(m.group("title")) == asciify(vidtitle): | |
| 278 return True # Close enough | |
| 279 | |
| 280 # Uh oh | |
| 281 return False | |
| 282 | |
| 283 def ia_get_original_files(identifier: str) -> typing.Optional[list]: | |
| 284 def ia_xml(identifier: str) -> typing.Optional[str]: | |
| 285 for _ in range(1, 9999): | |
| 286 try: | |
| 287 with urllib.request.urlopen("https://archive.org/download/%s/%s_files.xml" % (identifier, identifier)) as req: | |
| 288 return req.read().decode("utf-8") | |
| 289 except HTTPError as e: | |
| 290 if e.code == 404 or e.code == 503: | |
| 291 return None | |
| 292 time.sleep(5) | |
| 293 | |
| 294 d = ia_xml(identifier) | |
| 295 if d is None: | |
| 296 return None | |
| 297 | |
| 298 try: | |
| 299 # Now parse the XML and make a list of each original file | |
| 300 return [x.attrib["name"] for x in filter(lambda x: x.attrib["source"] == "original", XmlET.fromstring(d))] | |
| 301 except Exception as e: | |
| 302 print(e) | |
| 303 return None | |
| 304 | |
| 305 originalfiles = ia_get_original_files("youtube-%s" % video["id"]) | |
| 306 if not originalfiles: | |
| 251 return 2 | 307 return 2 |
| 252 | 308 |
| 253 flist = [ | 309 flist = [ |
| 254 f.name | 310 f |
| 255 for f in internetarchive.get_files("youtube-%s" % video["id"]) | 311 for f in originalfiles |
| 256 if ia_file_legit(f.name, video["id"]) | 312 if ia_file_legit(f, video["id"], video["title"] if not "fulltitle" in video else video["fulltitle"]) |
| 257 ] | 313 ] |
| 314 | |
| 315 if not flist: | |
| 316 return 2 # ?????? | |
| 258 | 317 |
| 259 while True: | 318 while True: |
| 260 try: | 319 try: |
| 261 internetarchive.download("youtube-%s" % video["id"], files=flist, | 320 internetarchive.download("youtube-%s" % video["id"], files=flist, |
| 262 verbose=True, | 321 verbose=True, ignore_existing=True, |
| 263 no_directory=True, ignore_existing=True, | |
| 264 retries=9999) | 322 retries=9999) |
| 265 break | 323 break |
| 266 except ConnectTimeout: | 324 except ConnectTimeout: |
| 267 time.sleep(1) | 325 time.sleep(1) |
| 268 continue | 326 continue |
| 277 # would incorrectly truncate | 335 # would incorrectly truncate |
| 278 # | 336 # |
| 279 # paper/2026-02-27: an update in the IA python library changed | 337 # paper/2026-02-27: an update in the IA python library changed |
| 280 # the way destdir works, so it just gets entirely ignored. | 338 # the way destdir works, so it just gets entirely ignored. |
| 281 for fname in flist: | 339 for fname in flist: |
| 282 def whitelist(s: str, vidid: str) -> bool: | 340 def getext(s: str, vidid: str) -> typing.Optional[str]: |
| 283 # special case: .info.json files | 341 # special cases |
| 284 if s == ("%s.info.json" % vidid): | 342 for i in [".info.json", ".annotations.xml"]: |
| 285 return ".info.json" | 343 if s.endswith(i): |
| 286 | 344 return i |
| 287 spli = os.path.splitext(fname) | 345 |
| 288 if spli is None or len(spli) != 2 or spli[0] != vidid: | 346 # Handle JDownloader "TITLE (Description).txt" |
| 347 if s.endswith(" (Description).txt"): | |
| 348 return ".description" | |
| 349 | |
| 350 # Catch-all for remaining extensions | |
| 351 spli = os.path.splitext(s) | |
| 352 if spli is None or len(spli) != 2: | |
| 289 return None | 353 return None |
| 290 | 354 |
| 291 return spli[1] | 355 return spli[1] |
| 292 | 356 |
| 293 if not os.path.exists(fname): | 357 ondisk = "youtube-%s/%s" % (video["id"], fname) |
| 358 | |
| 359 if not os.path.exists(ondisk): | |
| 294 continue | 360 continue |
| 295 | 361 |
| 296 ext = whitelist(fname, video["id"]) | 362 ext = getext(fname, video["id"]) |
| 297 if ext is None: | 363 if ext is None: |
| 298 continue | 364 continue |
| 299 | 365 |
| 300 os.replace(fname, "%s%s" % (basename, ext)) | 366 os.replace(ondisk, "%s%s" % (basename, ext)) |
| 367 | |
| 368 shutil.rmtree("youtube-%s" % video["id"]) | |
| 369 | |
| 301 return 0 | 370 return 0 |
| 302 | 371 |
| 303 | 372 |
| 304 def ytdlp_dl(video: dict, basename: str, output: str) -> int: | 373 def ytdlp_dl(video: dict, basename: str, output: str) -> int: |
| 305 # intentionally ignores all messages besides errors | 374 # intentionally ignores all messages besides errors |
| 468 output = channel["output"] | 537 output = channel["output"] |
| 469 | 538 |
| 470 print("%s:" % i["id"]) | 539 print("%s:" % i["id"]) |
| 471 basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], | 540 basename = "%s/%s-%s" % (output, sanitize_filename(i["title"], |
| 472 restricted=True), i["id"]) | 541 restricted=True), i["id"]) |
| 473 files = [y | 542 def filenotworthit(f) -> bool: |
| 543 try: | |
| 544 return bool(os.path.getsize(f)) | |
| 545 except: | |
| 546 return False | |
| 547 | |
| 548 pathoutput = Path(output) | |
| 549 | |
| 550 # This is terrible | |
| 551 files = list(filter(filenotworthit, [y | |
| 474 for p in ["mkv", "mp4", "webm"] | 552 for p in ["mkv", "mp4", "webm"] |
| 475 for y in Path(output).glob(("*-%s." + p) % i["id"])] | 553 for y in pathoutput.glob(("*-%s." + p) % i["id"])])) |
| 476 if files: | 554 if files: |
| 477 print(" video already downloaded!") | 555 print(" video already downloaded!") |
| 478 videos.remove(i) | 556 videos.remove(i) |
| 479 write_metadata(i, basename) | 557 write_metadata(i, basename) |
| 480 continue | 558 continue |
