add extractors for fantia and fanbox (#1459)

* add extractors for fantia and fanbox

* appease linter

* make docstrings unique

* [fantia] refactor post extraction

* [fantia] capitalize

* [fantia] improve regex pattern

* code style

* capitalize

* [fanbox] use BASE_PATTERN for url regexes

* [fanbox] refactor metadata and post extraction

* [fanbox] improve url base pattern

* [fanbox] accept creator page links ending with /posts

* [fanbox] more tests

* [fantia] improved pagination

* [fanbox] misc. code logic improvements

* [fantia] finish restructuring pagination code

* [fanbox] avoid making a request for each individual post when processing a creator page

* [fanbox] support embedded videos

* [fanbox] fix errors

* [fanbox] document extractor.fanbox.videos

* [fanbox] handle "article" and "entry" post types, all embeds

* [fanbox] fix downloading of embedded fanbox posts
pull/1529/head
thatfuckingbird 3 years ago committed by GitHub
parent d900edfcfb
commit e47952ac14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -968,6 +968,22 @@ Description
Download full-sized original images if available.
extractor.fanbox.embeds
-----------------------
Type
``bool`` or ``string``
Default
``true``
Description
Control behavior on embedded content from external sites.
* ``true``: Extract embed URLs and download them if supported
(videos are not downloaded).
* ``"ytdl"``: Like ``true``, but let `youtube-dl`_ handle video
extraction and download for YouTube, Vimeo and SoundCloud embeds.
* ``false``: Ignore embeds.
extractor.flickr.access-token & .access-token-secret
----------------------------------------------------
Type

@ -151,6 +151,18 @@ Consider all sites to be NSFW unless otherwise known.
<td>Chapters, Manga</td>
<td></td>
</tr>
<tr>
<td>Fanbox</td>
<td>https://www.fanbox.cc/</td>
<td>Creators, Posts</td>
<td><a href="https://github.com/mikf/gallery-dl#cookies">Cookies</a></td>
</tr>
<tr>
<td>Fantia</td>
<td>https://fantia.jp/</td>
<td>Creators, Posts</td>
<td><a href="https://github.com/mikf/gallery-dl#cookies">Cookies</a></td>
</tr>
<tr>
<td>Flickr</td>
<td>https://www.flickr.com/</td>

@ -31,6 +31,8 @@ modules = [
"erome",
"exhentai",
"fallenangels",
"fanbox",
"fantia",
"flickr",
"furaffinity",
"fuskator",

@ -0,0 +1,283 @@
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.fanbox.cc/"""
from .common import Extractor, Message
from .. import text
BASE_PATTERN = (
r"(?:https?://)?(?:"
r"(?!www\.)([\w-]+)\.fanbox\.cc|"
r"(?:www\.)?fanbox\.cc/@([\w-]+))"
)
class FanboxExtractor(Extractor):
"""Base class for Fanbox extractors"""
category = "fanbox"
root = "https://www.fanbox.cc"
directory_fmt = ("{category}", "{creatorId}")
filename_fmt = "{id}_{num}.{extension}"
archive_fmt = "{id}_{num}"
_warning = True
def __init__(self, match):
Extractor.__init__(self, match)
self.embeds = self.config("embeds", True)
def items(self):
yield Message.Version, 1
if self._warning:
if "FANBOXSESSID" not in self.session.cookies:
self.log.warning("no 'FANBOXSESSID' cookie set")
FanboxExtractor._warning = False
for content_body, post in self.posts():
yield Message.Directory, post
yield from self._get_urls_from_post(content_body, post)
def posts(self):
"""Return all relevant post objects"""
def _pagination(self, url):
headers = {"Origin": self.root}
while url:
url = text.ensure_http_scheme(url)
body = self.request(url, headers=headers).json()["body"]
for item in body["items"]:
yield self._process_post(item)
url = body["nextUrl"]
def _get_post_data_from_id(self, post_id):
"""Fetch and process post data"""
headers = {"Origin": self.root}
url = "https://api.fanbox.cc/post.info?postId="+post_id
post = self.request(url, headers=headers).json()["body"]
return self._process_post(post)
def _process_post(self, post):
content_body = post.pop("body", None)
if content_body:
if "html" in content_body:
post["html"] = content_body["html"]
if post["type"] == "article":
post["articleBody"] = content_body.copy()
post["date"] = text.parse_datetime(post["publishedDatetime"])
post["text"] = content_body.get("text") if content_body else None
post["isCoverImage"] = False
return content_body, post
def _get_urls_from_post(self, content_body, post):
num = 0
cover_image = post.get("coverImageUrl")
if cover_image:
final_post = post.copy()
final_post["isCoverImage"] = True
final_post["fileUrl"] = cover_image
text.nameext_from_url(cover_image, final_post)
final_post["num"] = num
num += 1
yield Message.Url, cover_image, final_post
if not content_body:
return
if "html" in content_body:
html_urls = []
for href in text.extract_iter(content_body["html"], 'href="', '"'):
if "fanbox.pixiv.net/images/entry" in href:
html_urls.append(href)
elif "downloads.fanbox.cc" in href:
html_urls.append(href)
for src in text.extract_iter(content_body["html"],
'data-src-original="', '"'):
html_urls.append(src)
for url in html_urls:
final_post = post.copy()
text.nameext_from_url(url, final_post)
final_post["fileUrl"] = url
final_post["num"] = num
num += 1
yield Message.Url, url, final_post
for group in ("images", "imageMap"):
if group in content_body:
for item in content_body[group]:
if group == "imageMap":
# imageMap is a dict with image objects as values
item = content_body[group][item]
final_post = post.copy()
final_post["fileUrl"] = item["originalUrl"]
text.nameext_from_url(item["originalUrl"], final_post)
if "extension" in item:
final_post["extension"] = item["extension"]
final_post["fileId"] = item.get("id")
final_post["width"] = item.get("width")
final_post["height"] = item.get("height")
final_post["num"] = num
num += 1
yield Message.Url, item["originalUrl"], final_post
for group in ("files", "fileMap"):
if group in content_body:
for item in content_body[group]:
if group == "fileMap":
# fileMap is a dict with file objects as values
item = content_body[group][item]
final_post = post.copy()
final_post["fileUrl"] = item["url"]
text.nameext_from_url(item["url"], final_post)
if "extension" in item:
final_post["extension"] = item["extension"]
if "name" in item:
final_post["filename"] = item["name"]
final_post["fileId"] = item.get("id")
final_post["num"] = num
num += 1
yield Message.Url, item["url"], final_post
if self.embeds:
embeds_found = []
if "video" in content_body:
embeds_found.append(content_body["video"])
embeds_found.extend(content_body.get("embedMap", {}).values())
for embed in embeds_found:
# embed_result is (message type, url, metadata dict)
embed_result = self._process_embed(post, embed)
if not embed_result:
continue
embed_result[2]["num"] = num
num += 1
yield embed_result
def _process_embed(self, post, embed):
final_post = post.copy()
provider = embed["serviceProvider"]
content_id = embed.get("videoId") or embed.get("contentId")
prefix = "ytdl:" if self.embeds == "ytdl" else ""
url = None
is_video = False
if provider == "soundcloud":
url = prefix+"https://soundcloud.com/"+content_id
is_video = True
elif provider == "youtube":
url = prefix+"https://youtube.com/watch?v="+content_id
is_video = True
elif provider == "vimeo":
url = prefix+"https://vimeo.com/"+content_id
is_video = True
elif provider == "fanbox":
# this is an old URL format that redirects
# to a proper Fanbox URL
url = "https://www.pixiv.net/fanbox/"+content_id
# resolve redirect
response = self.request(url, method="HEAD", allow_redirects=False)
url = response.headers["Location"]
final_post["_extractor"] = FanboxPostExtractor
elif provider == "twitter":
url = "https://twitter.com/_/status/"+content_id
elif provider == "google_forms":
templ = "https://docs.google.com/forms/d/e/{}/viewform?usp=sf_link"
url = templ.format(content_id)
else:
self.log.warning("service not recognized: {}".format(provider))
if url:
final_post["embed"] = embed
final_post["embedUrl"] = url
text.nameext_from_url(url, final_post)
msg_type = Message.Queue
if is_video and self.embeds == "ytdl":
msg_type = Message.Url
return msg_type, url, final_post
class FanboxCreatorExtractor(FanboxExtractor):
"""Extractor for a Fanbox creator's works"""
subcategory = "creator"
pattern = BASE_PATTERN + r"(?:/posts)?/?$"
test = (
("https://xub.fanbox.cc", {
"range": "1-15",
"count": ">= 15",
"keyword": {
"creatorId" : "xub",
"tags" : list,
"title" : str,
},
}),
("https://xub.fanbox.cc/posts"),
("https://www.fanbox.cc/@xub/"),
("https://www.fanbox.cc/@xub/posts"),
)
def __init__(self, match):
FanboxExtractor.__init__(self, match)
self.creator_id = match.group(1) or match.group(2)
def posts(self):
url = "https://api.fanbox.cc/post.listCreator?creatorId={}&limit=10"
return self._pagination(url.format(self.creator_id))
class FanboxPostExtractor(FanboxExtractor):
"""Extractor for media from a single Fanbox post"""
subcategory = "post"
pattern = BASE_PATTERN + r"/posts/(\d+)"
test = (
("https://www.fanbox.cc/@xub/posts/1910054", {
"count": 3,
"keyword": {
"title": "えま★おうがすと",
"tags": list,
"hasAdultContent": True,
"isCoverImage": False
},
}),
# entry post type, image embedded in html of the post
("https://nekoworks.fanbox.cc/posts/915", {
"count": 2,
"keyword": {
"title": "【SAYORI FAN CLUB】お届け内容",
"tags": list,
"html": str,
"hasAdultContent": True
},
}),
# article post type, imageMap, 2 twitter embeds, fanbox embed
("https://steelwire.fanbox.cc/posts/285502", {
"options": (("embeds", True),),
"count": 10,
"keyword": {
"title": "イラスト+SS義足の炭鉱少年が義足を見せてくれるだけ 【全体公開版】",
"tags": list,
"articleBody": dict,
"hasAdultContent": True
},
}),
)
def __init__(self, match):
FanboxExtractor.__init__(self, match)
self.post_id = match.group(3)
def posts(self):
return (self._get_post_data_from_id(self.post_id),)

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://fantia.jp/"""
from .common import Extractor, Message
from .. import text
class FantiaExtractor(Extractor):
"""Base class for Fantia extractors"""
category = "fantia"
root = "https://fantia.jp"
directory_fmt = ("{category}", "{fanclub_id}")
filename_fmt = "{post_id}_{file_id}.{extension}"
archive_fmt = "{post_id}_{file_id}"
_warning = True
def items(self):
yield Message.Version, 1
if self._warning:
if "_session_id" not in self.session.cookies:
self.log.warning("no '_session_id' cookie set")
FantiaExtractor._warning = False
for post_id in self.posts():
full_response, post = self._get_post_data(post_id)
yield Message.Directory, post
for url, url_data in self._get_urls_from_post(full_response, post):
fname = url_data["content_filename"] or url
text.nameext_from_url(fname, url_data)
url_data["file_url"] = url
yield Message.Url, url, url_data
def posts(self):
"""Return post IDs"""
def _pagination(self, url):
params = {"page": 1}
headers = {"Referer": self.root}
while True:
page = self.request(url, params=params, headers=headers).text
post_id = None
for post_id in text.extract_iter(
page, 'class="link-block" href="/posts/', '"'):
yield post_id
if not post_id:
return
params["page"] += 1
def _get_post_data(self, post_id):
"""Fetch and process post data"""
headers = {"Referer": self.root}
url = self.root+"/api/v1/posts/"+post_id
resp = self.request(url, headers=headers).json()["post"]
post = {
"post_id": resp["id"],
"post_url": self.root + "/posts/" + str(resp["id"]),
"post_title": resp["title"],
"comment": resp["comment"],
"rating": resp["rating"],
"posted_at": resp["posted_at"],
"fanclub_id": resp["fanclub"]["id"],
"fanclub_user_id": resp["fanclub"]["user"]["id"],
"fanclub_user_name": resp["fanclub"]["user"]["name"],
"fanclub_name": resp["fanclub"]["name"],
"fanclub_url": self.root+"/fanclubs/"+str(resp["fanclub"]["id"]),
"tags": resp["tags"]
}
return resp, post
def _get_urls_from_post(self, resp, post):
"""Extract individual URL data from the response"""
if "thumb" in resp and resp["thumb"] and "original" in resp["thumb"]:
post["content_filename"] = ""
post["content_category"] = "thumb"
post["file_id"] = "thumb"
yield resp["thumb"]["original"], post
for content in resp["post_contents"]:
post["content_category"] = content["category"]
post["content_title"] = content["title"]
post["content_filename"] = content.get("filename", "")
post["content_id"] = content["id"]
if "post_content_photos" in content:
for photo in content["post_content_photos"]:
post["file_id"] = photo["id"]
yield photo["url"]["original"], post
if "download_uri" in content:
post["file_id"] = content["id"]
yield self.root+"/"+content["download_uri"], post
class FantiaCreatorExtractor(FantiaExtractor):
"""Extractor for a Fantia creator's works"""
subcategory = "creator"
pattern = r"(?:https?://)?(?:www\.)?fantia\.jp/fanclubs/(\d+)"
test = (
("https://fantia.jp/fanclubs/6939", {
"range": "1-25",
"count": ">= 25",
"keyword": {
"fanclub_user_id" : 52152,
"tags" : list,
"title" : str,
},
}),
)
def __init__(self, match):
FantiaExtractor.__init__(self, match)
self.creator_id = match.group(1)
def posts(self):
url = "{}/fanclubs/{}/posts".format(self.root, self.creator_id)
return self._pagination(url)
class FantiaPostExtractor(FantiaExtractor):
"""Extractor for media from a single Fantia post"""
subcategory = "post"
pattern = r"(?:https?://)?(?:www\.)?fantia\.jp/posts/(\d+)"
test = (
("https://fantia.jp/posts/508363", {
"count": 6,
"keyword": {
"post_title": "zunda逆バニーでおしりコッショリ",
"tags": list,
"rating": "adult",
"post_id": 508363
},
}),
)
def __init__(self, match):
FantiaExtractor.__init__(self, match)
self.post_id = match.group(1)
def posts(self):
return (self.post_id,)

@ -208,6 +208,8 @@ AUTH_MAP = {
"e621" : "Supported",
"e-hentai" : "Supported",
"exhentai" : "Supported",
"fanbox" : _COOKIES,
"fantia" : _COOKIES,
"flickr" : _OAUTH,
"furaffinity" : _COOKIES,
"idolcomplex" : "Supported",

Loading…
Cancel
Save