Mercurial > codedump
comparison kemonopartydownloader.py @ 115:f10492e8720b
kemonopartydownloader.py: add youtube downloader stubs and dump json to disk
author | Paper <mrpapersonic@gmail.com> |
---|---|
date | Mon, 23 Jan 2023 23:58:22 -0500 |
parents | b14e2a096ebf |
children |
comparison
equal
deleted
inserted
replaced
114:80bd4a99ea00 | 115:f10492e8720b |
---|---|
24 import requests # pip install requests | 24 import requests # pip install requests |
25 import time | 25 import time |
26 import math | 26 import math |
27 import zipfile | 27 import zipfile |
28 import urllib.parse | 28 import urllib.parse |
29 import yt_dlp | |
30 from yt_dlp.utils import sanitize_filename as sanitize | |
29 from urllib.error import HTTPError | 31 from urllib.error import HTTPError |
30 from http.client import BadStatusLine | 32 from http.client import BadStatusLine |
31 | 33 |
32 | 34 |
33 def download_folder_from_google_drive(link: str) -> int: | 35 def download_folder_from_google_drive(link: str) -> int: |
125 if filename.endswith(".zip"): | 127 if filename.endswith(".zip"): |
126 unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) | 128 unzip(filepath, urllib.parse.unquote(os.path.splitext(filepath)[0])) |
127 os.remove(filepath) | 129 os.remove(filepath) |
128 | 130 |
129 | 131 |
132 def download_from_youtube(link: str) -> int: # int is the response | |
133 return 0 # just a stub for now | |
134 | |
135 | |
130 # https://stackoverflow.com/a/39225039 | 136 # https://stackoverflow.com/a/39225039 |
131 def download_file_from_google_drive(drive_id: str, out: str = "") -> None: | 137 def download_file_from_google_drive(drive_id: str, out: str = "") -> None: |
132 def get_confirm_token(response: requests.Response): | 138 def get_confirm_token(response: requests.Response): |
133 for key, value in response.cookies.items(): | 139 for key, value in response.cookies.items(): |
134 if key.startswith('download_warning'): | 140 if key.startswith('download_warning'): |
192 params = {'id': drive_id, 'confirm': token} | 198 params = {'id': drive_id, 'confirm': token} |
193 response = session.get(URL, headers=headers, params=params, | 199 response = session.get(URL, headers=headers, params=params, |
194 stream=True) | 200 stream=True) |
195 | 201 |
196 save_response_content(response) | 202 save_response_content(response) |
197 | |
198 | |
199 def sanitize(filename: str) -> str: | |
200 return re.sub(r"[\/:*?\"<>|]", "_", filename).strip() | |
201 | 203 |
202 | 204 |
203 def find_urls(s: str) -> list: | 205 def find_urls(s: str) -> list: |
204 url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + | 206 url_regex = (r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:""" + |
205 """%[0-9a-fA-F][0-9a-fA-F]))+""") | 207 """%[0-9a-fA-F][0-9a-fA-F]))+""") |
243 | 245 |
244 | 246 |
245 def parse_json(i: dict, count: int) -> None: | 247 def parse_json(i: dict, count: int) -> None: |
246 unique_urls = [] | 248 unique_urls = [] |
247 for url in find_urls(i["content"]): | 249 for url in find_urls(i["content"]): |
250 # spaghetti | |
248 parsed_url = urllib.parse.urlparse(url) | 251 parsed_url = urllib.parse.urlparse(url) |
249 if parsed_url.netloc == "drive.google.com": | 252 if parsed_url.netloc == "drive.google.com": |
250 if parsed_url.path.startswith("/drive/folders"): | 253 if parsed_url.path.startswith("/drive/folders"): |
251 if url not in unique_urls: | 254 if url not in unique_urls: |
252 download_folder_from_google_drive(url) | 255 download_folder_from_google_drive(url) |
265 unique_urls.append(url) | 268 unique_urls.append(url) |
266 elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: | 269 elif parsed_url.netloc in ["dropbox.com", "www.dropbox.com"]: |
267 if url not in unique_urls: | 270 if url not in unique_urls: |
268 download_from_dropbox(url) | 271 download_from_dropbox(url) |
269 unique_urls.append(url) | 272 unique_urls.append(url) |
273 elif parsed_url.netloc in ["youtube.com", "youtu.be", "www.youtube.com"]: | |
274 if url not in unique_urls: | |
275 download_from_youtube(url) | |
276 unique_urls.append(url) | |
270 for x in i["attachments"]: | 277 for x in i["attachments"]: |
271 count += 1 | 278 count += 1 |
272 while not os.path.exists("%s/%s_%dp_%s_%s" | 279 while not os.path.exists("%s/%s_%dp_%s_%s" |
273 % (output, i["id"], count, | 280 % (output, i["id"], count, |
274 sanitize(i["title"]), x["name"])): | 281 sanitize(i["title"]), x["name"])): |
339 % (service, user, (page * 25))).json() | 346 % (service, user, (page * 25))).json() |
340 for i in userdata: | 347 for i in userdata: |
341 print(i["id"]) | 348 print(i["id"]) |
342 count = 0 | 349 count = 0 |
343 parse_json(i, count) | 350 parse_json(i, count) |
351 filename = "%s/%s_%dp_%s.info.json" % (output, i["id"], count, | |
352 sanitize(i["title"])) | |
353 json.dump(i, filename) |