Mercurial > codedump
comparison kemonopartydownloader.py @ 115:f10492e8720b
kemonopartydownloader.py: add youtube downloader stubs and dump json to disk
| author | Paper <mrpapersonic@gmail.com> |
|---|---|
| date | Mon, 23 Jan 2023 23:58:22 -0500 |
| parents | b14e2a096ebf |
| children |
comparison
equal
deleted
inserted
replaced
| 114:80bd4a99ea00 | 115:f10492e8720b |
|---|---|
| 24 import requests # pip install requests | 24 import requests # pip install requests |
| 25 import time | 25 import time |
| 26 import math | 26 import math |
| 27 import zipfile | 27 import zipfile |
| 28 import urllib.parse | 28 import urllib.parse |
| 29 import yt_dlp | |
| 30 from yt_dlp.utils import sanitize_filename as sanitize | |
| 29 from urllib.error import HTTPError | 31 from urllib.error import HTTPError |
| 30 from http.client import BadStatusLine | 32 from http.client import BadStatusLine |
| 31 | 33 |
| 32 | 34 |
| 33 def download_folder_from_google_drive(link: str) -> int: | 35 def download_folder_from_google_drive(link: str) -> int: |
| 125 if filename.endswith(".zip"): | 127 if filename.endswith(".zip"): |
| 126 unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) | 128 unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) |
| 127 os.remove(filepath) | 129 os.remove(filepath) |
| 128 | 130 |
| 129 | 131 |
| 132 def download_from_youtube(link: str) -> int: # int is the response | |
| 133 return 0 # just a stub for now | |
| 134 | |
| 135 | |
| 130 # https://stackoverflow.com/a/39225039 | 136 # https://stackoverflow.com/a/39225039 |
| 131 def download_file_from_google_drive(drive_id: str, out: str = "") -> None: | 137 def download_file_from_google_drive(drive_id: str, out: str = "") -> None: |
| 132 def get_confirm_token(response: requests.Response): | 138 def get_confirm_token(response: requests.Response): |
| 133 for key, value in response.cookies.items(): | 139 for key, value in response.cookies.items(): |
| 134 if key.startswith('download_warning'): | 140 if key.startswith('download_warning'): |
| 192 params = {'id': drive_id, 'confirm': token} | 198 params = {'id': drive_id, 'confirm': token} |
| 193 response = session.get(URL, headers=headers, params=params, | 199 response = session.get(URL, headers=headers, params=params, |
| 194 stream=True) | 200 stream=True) |
| 195 | 201 |
| 196 save_response_content(response) | 202 save_response_content(response) |
| 197 | |
| 198 | |
| 199 def sanitize(filename: str) -> str: | |
| 200 return re.sub(r"[\/:*?\"<>|]", "_", filename).strip() | |
| 201 | 203 |
| 202 | 204 |
| 203 def find_urls(s: str) -> list: | 205 def find_urls(s: str) -> list: |
| 204 url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + | 206 url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + |
| 205 """%[0-9a-fA-F][0-9a-fA-F]))+""") | 207 """%[0-9a-fA-F][0-9a-fA-F]))+""") |
| 243 | 245 |
| 244 | 246 |
| 245 def parse_json(i: dict, count: int) -> None: | 247 def parse_json(i: dict, count: int) -> None: |
| 246 unique_urls = [] | 248 unique_urls = [] |
| 247 for url in find_urls(i["content"]): | 249 for url in find_urls(i["content"]): |
| 250 # spaghetti | |
| 248 parsed_url = urllib.parse.urlparse(url) | 251 parsed_url = urllib.parse.urlparse(url) |
| 249 if parsed_url.netloc == "drive.google.com": | 252 if parsed_url.netloc == "drive.google.com": |
| 250 if parsed_url.path.startswith("/drive/folders"): | 253 if parsed_url.path.startswith("/drive/folders"): |
| 251 if url not in unique_urls: | 254 if url not in unique_urls: |
| 252 download_folder_from_google_drive(url) | 255 download_folder_from_google_drive(url) |
| 265 unique_urls.append(url) | 268 unique_urls.append(url) |
| 266 elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: | 269 elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: |
| 267 if url not in unique_urls: | 270 if url not in unique_urls: |
| 268 download_from_dropbox(url) | 271 download_from_dropbox(url) |
| 269 unique_urls.append(url) | 272 unique_urls.append(url) |
| 273 elif parsed_url.netloc in ["youtube.com", "youtu.be", "www.youtube.com"]: | |
| 274 if url not in unique_urls: | |
| 275 download_from_youtube(url) | |
| 276 unique_urls.append(url) | |
| 270 for x in i["attachments"]: | 277 for x in i["attachments"]: |
| 271 count += 1 | 278 count += 1 |
| 272 while not os.path.exists("%s/%s_%dp_%s_%s" | 279 while not os.path.exists("%s/%s_%dp_%s_%s" |
| 273 % (output, i["id"], count, | 280 % (output, i["id"], count, |
| 274 sanitize(i["title"]), x["name"])): | 281 sanitize(i["title"]), x["name"])): |
| 339 % (service, user, (page * 25))).json() | 346 % (service, user, (page * 25))).json() |
| 340 for i in userdata: | 347 for i in userdata: |
| 341 print(i["id"]) | 348 print(i["id"]) |
| 342 count = 0 | 349 count = 0 |
| 343 parse_json(i, count) | 350 parse_json(i, count) |
| 351 filename = "%s/%s_%dp_%s.info.json" % (output, i["id"], count, | |
| 352 sanitize(i["title"])) | |
| 353 json.dump(i, filename) |
