# -*- coding: utf-8 -*- # Copyright 2018-2023 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://www.newgrounds.com/""" from .common import Extractor, Message from .. import text, util, exception from ..cache import cache import itertools import re class NewgroundsExtractor(Extractor): """Base class for newgrounds extractors""" category = "newgrounds" directory_fmt = ("{category}", "{artist[:10]:J, }") filename_fmt = "{category}_{_index}_{title}.{extension}" archive_fmt = "{_type}{_index}" root = "https://www.newgrounds.com" cookies_domain = ".newgrounds.com" cookies_names = ("NG_GG_username", "vmk1du5I8m") request_interval = (0.5, 1.5) def __init__(self, match): Extractor.__init__(self, match) self.user = match.group(1) self.user_root = "https://{}.newgrounds.com".format(self.user) def _init(self): self.flash = self.config("flash", True) fmt = self.config("format") if not fmt or fmt == "original": self.format = ("mp4", "webm", "m4v", "mov", "mkv", 1080, 720, 360) elif isinstance(fmt, (list, tuple)): self.format = fmt else: self._video_formats = self._video_formats_limit self.format = (fmt if isinstance(fmt, int) else text.parse_int(fmt.rstrip("p"))) def items(self): self.login() metadata = self.metadata() for post_url in self.posts(): try: post = self.extract_post(post_url) url = post.get("url") except Exception: self.log.debug("", exc_info=True) url = None if url: if metadata: post.update(metadata) yield Message.Directory, post post["num"] = 0 yield Message.Url, url, text.nameext_from_url(url, post) if "_multi" in post: for data in post["_multi"]: post["num"] += 1 post["_index"] = "{}_{:>02}".format( post["index"], post["num"]) post.update(data) url = data["image"] text.nameext_from_url(url, post) yield Message.Url, url, post if "_fallback" in post: del post["_fallback"] for url in text.extract_iter( post["_comment"], 'data-smartload-src="', '"'): post["num"] += 1 post["_index"] = "{}_{:>02}".format( post["index"], post["num"]) url = text.ensure_http_scheme(url) text.nameext_from_url(url, post) yield Message.Url, url, post else: self.log.warning( "Unable to get download URL for '%s'", post_url) def posts(self): """Return URLs of all relevant post pages""" return self._pagination(self._path) def metadata(self): """Return general metadata""" def login(self): if self.cookies_check(self.cookies_names): return username, password = self._get_auth_info() if username: self.cookies_update(self._login_impl(username, password)) @cache(maxage=365*86400, keyarg=1) def _login_impl(self, username, password): self.log.info("Logging in as %s", username) url = self.root + "/passport" response = self.request(url) if response.history and response.url.endswith("/social"): return self.cookies page = response.text headers = { "Accept": "application/json, text/javascript, */*; q=0.01", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "X-Requested-With": "XMLHttpRequest", "Origin": self.root, "Referer": url, } url = text.urljoin(self.root, text.extr(page, 'action="', '"')) data = { "auth" : text.extr(page, 'name="auth" value="', '"'), "remember": "1", "username": username, "password": str(password), "code" : "", "codehint": "------", "mfaCheck": "1", } while True: response = self.request( url, method="POST", headers=headers, data=data) result = response.json() if result.get("success"): break if "errors" in result: raise exception.AuthenticationError( '"' + '", "'.join(result["errors"]) + '"') if result.get("requiresMfa"): data["code"] = self.input("Verification Code: ") data["codehint"] = " " elif result.get("requiresEmailMfa"): email = result.get("obfuscatedEmail") prompt = "Email Verification Code ({}): ".format(email) data["code"] = self.input(prompt) data["codehint"] = " " data.pop("mfaCheck", None) return { cookie.name: cookie.value for cookie in response.cookies } def extract_post(self, post_url): url = post_url if "/art/view/" in post_url: extract_data = self._extract_image_data elif "/audio/listen/" in post_url: extract_data = self._extract_audio_data else: extract_data = self._extract_media_data if self.flash: url += "/format/flash" with self.request(url, fatal=False) as response: if response.status_code >= 400: return {} page = response.text pos = page.find('id="adults_only"') if pos >= 0: msg = text.extract(page, 'class="highlight">', '<', pos)[0] self.log.warning('"%s"', msg) extr = text.extract_from(page) data = extract_data(extr, post_url) data["_comment"] = extr( 'id="author_comments"', '').partition(">")[2] data["comment"] = text.unescape(text.remove_html( data["_comment"], "", "")) data["favorites"] = text.parse_int(extr( 'id="faves_load">', '<').replace(",", "")) data["score"] = text.parse_float(extr('id="score_number">', '<')) data["tags"] = text.split_html(extr('
', '
')) data["artist"] = [ text.extr(user, '//', '.') for user in text.extract_iter(page, '
', '>') ] data["tags"].sort() data["user"] = self.user or data["artist"][0] data["post_url"] = post_url return data def _extract_image_data(self, extr, url): full = text.extract_from(util.json_loads(extr( '"full_image_text":', '});'))) data = { "title" : text.unescape(extr('"og:title" content="', '"')), "description": text.unescape(extr(':description" content="', '"')), "type" : extr('og:type" content="', '"'), "_type" : "i", "date" : text.parse_datetime(extr( 'itemprop="datePublished" content="', '"')), "rating" : extr('class="rated-', '"'), "url" : full('src="', '"'), "width" : text.parse_int(full('width="', '"')), "height" : text.parse_int(full('height="', '"')), } index = data["url"].rpartition("/")[2].partition("_")[0] data["index"] = text.parse_int(index) data["_index"] = index image_data = extr("let imageData =", "\n];") if image_data: data["_multi"] = self._extract_images_multi(image_data) else: art_images = extr('