[sankaku] use 'beta.sankakucomplex.com' API endpoints

pull/1195/head
Mike Fährmann 4 years ago
parent b3ecc89a9a
commit ecdea799dd
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88

@ -1,26 +1,116 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2018-2019 Mike Fährmann # Copyright 2018-2020 Mike Fährmann
# #
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as # it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation. # published by the Free Software Foundation.
"""Extract images from https://idol.sankakucomplex.com/""" """Extractors for https://idol.sankakucomplex.com/"""
from . import sankaku from .sankaku import SankakuExtractor
from .common import Message
from .. import text, util, exception
import collections
import random
import time
import re
class IdolcomplexExtractor(sankaku.SankakuExtractor): class IdolcomplexExtractor(SankakuExtractor):
"""Base class for idolcomplex extractors""" """Base class for idolcomplex extractors"""
category = "idolcomplex" category = "idolcomplex"
cookiedomain = "idol.sankakucomplex.com" cookiedomain = "idol.sankakucomplex.com"
root = "https://" + cookiedomain
subdomain = "idol" subdomain = "idol"
def __init__(self, match):
SankakuExtractor.__init__(self, match)
self.logged_in = True
self.start_page = 1
self.start_post = 0
self.extags = self.config("tags", False)
self.wait_min = self.config("wait-min", 3.0)
self.wait_max = self.config("wait-max", 6.0)
if self.wait_max < self.wait_min:
self.wait_max = self.wait_min
class IdolcomplexTagExtractor(IdolcomplexExtractor, def items(self):
sankaku.SankakuTagExtractor): self.login()
data = self.metadata()
for post_id in util.advance(self.post_ids(), self.start_post):
self.wait()
post = self._parse_post(post_id)
url = post["file_url"]
post.update(data)
text.nameext_from_url(url, post)
yield Message.Directory, post
yield Message.Url, url, post
def skip(self, num):
self.start_post += num
return num
def post_ids(self):
"""Return an iterable containing all relevant post ids"""
def _parse_post(self, post_id):
"""Extract metadata of a single post"""
url = self.root + "/post/show/" + post_id
page = self.request(url, retries=10).text
extr = text.extract
tags , pos = extr(page, "<title>", " | ")
vavg , pos = extr(page, "itemprop=ratingValue>", "<", pos)
vcnt , pos = extr(page, "itemprop=reviewCount>", "<", pos)
_ , pos = extr(page, "Posted: <", "", pos)
created, pos = extr(page, ' title="', '"', pos)
rating = extr(page, "<li>Rating: ", "<", pos)[0]
file_url, pos = extr(page, '<li>Original: <a href="', '"', pos)
if file_url:
width , pos = extr(page, '>', 'x', pos)
height, pos = extr(page, '', ' ', pos)
else:
width , pos = extr(page, '<object width=', ' ', pos)
height, pos = extr(page, 'height=', '>', pos)
file_url = extr(page, '<embed src="', '"', pos)[0]
data = {
"id": text.parse_int(post_id),
"md5": file_url.rpartition("/")[2].partition(".")[0],
"tags": text.unescape(tags),
"vote_average": text.parse_float(vavg),
"vote_count": text.parse_int(vcnt),
"created_at": created,
"rating": (rating or "?")[0].lower(),
"file_url": "https:" + text.unescape(file_url),
"width": text.parse_int(width),
"height": text.parse_int(height),
}
if self.extags:
tags = collections.defaultdict(list)
tags_html = text.extract(page, '<ul id=tag-sidebar>', '</ul>')[0]
pattern = re.compile(r'tag-type-([^>]+)><a href="/\?tags=([^"]+)')
for tag_type, tag_name in pattern.findall(tags_html or ""):
tags[tag_type].append(text.unquote(tag_name))
for key, value in tags.items():
data["tags_" + key] = " ".join(value)
return data
def wait(self):
"""Wait for a randomly chosen amount of seconds"""
time.sleep(random.uniform(self.wait_min, self.wait_max))
class IdolcomplexTagExtractor(IdolcomplexExtractor):
"""Extractor for images from idol.sankakucomplex.com by search-tags""" """Extractor for images from idol.sankakucomplex.com by search-tags"""
subcategory = "tag"
directory_fmt = ("{category}", "{search_tags}")
archive_fmt = "t_{search_tags}_{id}"
pattern = r"(?:https?://)?idol\.sankakucomplex\.com/\?([^#]*)" pattern = r"(?:https?://)?idol\.sankakucomplex\.com/\?([^#]*)"
test = ( test = (
("https://idol.sankakucomplex.com/?tags=lyumos+wreath", { ("https://idol.sankakucomplex.com/?tags=lyumos+wreath", {
@ -31,20 +121,110 @@ class IdolcomplexTagExtractor(IdolcomplexExtractor,
("https://idol.sankakucomplex.com" ("https://idol.sankakucomplex.com"
"/?tags=lyumos+wreath&page=3&next=694215"), "/?tags=lyumos+wreath&page=3&next=694215"),
) )
per_page = 20
def __init__(self, match):
IdolcomplexExtractor.__init__(self, match)
query = text.parse_query(match.group(1))
self.tags = text.unquote(query.get("tags", "").replace("+", " "))
self.start_page = text.parse_int(query.get("page"), 1)
self.next = text.parse_int(query.get("next"), 0)
def skip(self, num):
if self.next:
self.start_post += num
else:
pages, posts = divmod(num, self.per_page)
self.start_page += pages
self.start_post += posts
return num
def metadata(self):
if not self.next:
max_page = 50 if self.logged_in else 25
if self.start_page > max_page:
self.log.info("Traversing from page %d to page %d",
max_page, self.start_page)
self.start_post += self.per_page * (self.start_page - max_page)
self.start_page = max_page
tags = self.tags.split()
if not self.logged_in and len(tags) > 4:
raise exception.StopExtraction(
"Non-members can only search up to 4 tags at once")
return {"search_tags": " ".join(tags)}
def post_ids(self):
params = {"tags": self.tags}
class IdolcomplexPoolExtractor(IdolcomplexExtractor, if self.next:
sankaku.SankakuPoolExtractor): params["next"] = self.next
else:
params["page"] = self.start_page
while True:
self.wait()
page = self.request(self.root, params=params, retries=10).text
pos = page.find("<div id=more-popular-posts-link>") + 1
ids = list(text.extract_iter(page, '" id=p', '>', pos))
if not ids:
return
yield from ids
next_qs = text.extract(page, 'next-page-url="/?', '"', pos)[0]
next_id = text.parse_query(next_qs).get("next")
# stop if the same "next" parameter occurs twice in a row (#265)
if "next" in params and params["next"] == next_id:
return
params["next"] = next_id or (text.parse_int(ids[-1]) - 1)
params["page"] = "2"
class IdolcomplexPoolExtractor(IdolcomplexExtractor):
"""Extractor for image-pools from idol.sankakucomplex.com""" """Extractor for image-pools from idol.sankakucomplex.com"""
subcategory = "pool"
directory_fmt = ("{category}", "pool", "{pool}")
archive_fmt = "p_{pool}_{id}"
pattern = r"(?:https?://)?idol\.sankakucomplex\.com/pool/show/(\d+)" pattern = r"(?:https?://)?idol\.sankakucomplex\.com/pool/show/(\d+)"
test = ("https://idol.sankakucomplex.com/pool/show/145", { test = ("https://idol.sankakucomplex.com/pool/show/145", {
"count": 3, "count": 3,
}) })
per_page = 24
def __init__(self, match):
IdolcomplexExtractor.__init__(self, match)
self.pool_id = match.group(1)
def skip(self, num):
pages, posts = divmod(num, self.per_page)
self.start_page += pages
self.start_post += posts
return num
class IdolcomplexPostExtractor(IdolcomplexExtractor, def metadata(self):
sankaku.SankakuPostExtractor): return {"pool": self.pool_id}
def post_ids(self):
url = self.root + "/pool/show/" + self.pool_id
params = {"page": self.start_page}
while True:
page = self.request(url, params=params, retries=10).text
ids = list(text.extract_iter(page, '" id=p', '>'))
yield from ids
if len(ids) < self.per_page:
return
params["page"] += 1
class IdolcomplexPostExtractor(IdolcomplexExtractor):
"""Extractor for single images from idol.sankakucomplex.com""" """Extractor for single images from idol.sankakucomplex.com"""
subcategory = "post"
archive_fmt = "{id}"
pattern = r"(?:https?://)?idol\.sankakucomplex\.com/post/show/(\d+)" pattern = r"(?:https?://)?idol\.sankakucomplex\.com/post/show/(\d+)"
test = ("https://idol.sankakucomplex.com/post/show/694215", { test = ("https://idol.sankakucomplex.com/post/show/694215", {
"content": "694ec2491240787d75bf5d0c75d0082b53a85afd", "content": "694ec2491240787d75bf5d0c75d0082b53a85afd",
@ -57,3 +237,10 @@ class IdolcomplexPostExtractor(IdolcomplexExtractor,
"tags_general": str, "tags_general": str,
}, },
}) })
def __init__(self, match):
IdolcomplexExtractor.__init__(self, match)
self.post_id = match.group(1)
def post_ids(self):
return (self.post_id,)

@ -9,109 +9,56 @@
"""Extractors for https://chan.sankakucomplex.com/""" """Extractors for https://chan.sankakucomplex.com/"""
from .common import Extractor, Message from .common import Extractor, Message
from .. import text, util, exception from .. import text, exception
from ..cache import cache from ..cache import cache
import collections import collections
import random
import time BASE_PATTERN = r"(?:https?://)?(?:beta|chan)\.sankakucomplex\.com"
import re
class SankakuExtractor(Extractor): class SankakuExtractor(Extractor):
"""Base class for sankaku extractors""" """Base class for sankaku channel extractors"""
basecategory = "booru" basecategory = "booru"
category = "sankaku" category = "sankaku"
filename_fmt = "{category}_{id}_{md5}.{extension}" filename_fmt = "{category}_{id}_{md5}.{extension}"
cookienames = ("login", "pass_hash") cookienames = ("login", "pass_hash")
cookiedomain = "chan.sankakucomplex.com" cookiedomain = "chan.sankakucomplex.com"
request_interval_min = 1.0
subdomain = "chan" subdomain = "chan"
per_page = 100
def __init__(self, match):
Extractor.__init__(self, match) TAG_TYPES = {
self.root = "https://" + self.cookiedomain 0: "general",
self.logged_in = True 1: "artist",
self.start_page = 1 2: "studio",
self.start_post = 0 3: "copyright",
self.extags = self.config("tags", False) 4: "character",
self.wait_min = self.config("wait-min", 3.0) 5: "genre",
self.wait_max = self.config("wait-max", 6.0) 6: "",
if self.wait_max < self.wait_min: 7: "",
self.wait_max = self.wait_min 8: "medium",
9: "meta",
}
def items(self): def items(self):
extended_tags = self.config("tags", False)
self.login() self.login()
data = self.metadata()
yield Message.Version, 1 for post in self.posts():
data = self.get_metadata() try:
url = self._prepare_post(post, extended_tags)
for post_id in util.advance(self.get_posts(), self.start_post): except KeyError:
self.wait() continue
post = self.get_post_data(post_id)
url = post["file_url"]
post.update(data) post.update(data)
text.nameext_from_url(url, post) text.nameext_from_url(url, post)
yield Message.Directory, post yield Message.Directory, post
yield Message.Url, url, post yield Message.Url, url, post
def skip(self, num): def metadata(self):
self.start_post += num return ()
return num
def get_metadata(self):
"""Return general metadata"""
return {}
def get_posts(self):
"""Return an iterable containing all relevant post ids"""
def get_post_data(self, post_id, extr=text.extract):
"""Extract metadata of a single post"""
url = self.root + "/post/show/" + post_id
page = self.request(url, retries=10).text
tags , pos = extr(page, "<title>", " | ")
vavg , pos = extr(page, "itemprop=ratingValue>", "<", pos)
vcnt , pos = extr(page, "itemprop=reviewCount>", "<", pos)
_ , pos = extr(page, "Posted: <", "", pos)
created, pos = extr(page, ' title="', '"', pos)
rating = extr(page, "<li>Rating: ", "<", pos)[0]
file_url, pos = extr(page, '<li>Original: <a href="', '"', pos)
if file_url:
width , pos = extr(page, '>', 'x', pos)
height, pos = extr(page, '', ' ', pos)
else:
width , pos = extr(page, '<object width=', ' ', pos)
height, pos = extr(page, 'height=', '>', pos)
file_url = extr(page, '<embed src="', '"', pos)[0]
data = {
"id": text.parse_int(post_id),
"md5": file_url.rpartition("/")[2].partition(".")[0],
"tags": text.unescape(tags),
"vote_average": text.parse_float(vavg),
"vote_count": text.parse_int(vcnt),
"created_at": created,
"rating": (rating or "?")[0].lower(),
"file_url": "https:" + text.unescape(file_url),
"width": text.parse_int(width),
"height": text.parse_int(height),
}
if self.extags:
tags = collections.defaultdict(list)
tags_html = text.extract(page, '<ul id=tag-sidebar>', '</ul>')[0]
pattern = re.compile(r'tag-type-([^>]+)><a href="/\?tags=([^"]+)')
for tag_type, tag_name in pattern.findall(tags_html or ""):
tags[tag_type].append(text.unquote(tag_name))
for key, value in tags.items():
data["tags_" + key] = " ".join(value)
return data
def wait(self): def posts(self):
"""Wait for a randomly chosen amount of seconds""" return ()
time.sleep(random.uniform(self.wait_min, self.wait_max))
def login(self): def login(self):
"""Login and set necessary cookies""" """Login and set necessary cookies"""
@ -128,12 +75,13 @@ class SankakuExtractor(Extractor):
def _login_impl(self, usertuple, password): def _login_impl(self, usertuple, password):
username = usertuple[0] username = usertuple[0]
self.log.info("Logging in as %s", username) self.log.info("Logging in as %s", username)
url = self.root + "/user/authenticate" url = self.root + "/user/authenticate"
data = { data = {
"url": "", "url" : "",
"user[name]": username, "user[name]" : username,
"user[password]": password, "user[password]": password,
"commit": "Login", "commit" : "Login",
} }
response = self.request(url, method="POST", data=data) response = self.request(url, method="POST", data=data)
@ -142,27 +90,62 @@ class SankakuExtractor(Extractor):
cookies = response.history[0].cookies cookies = response.history[0].cookies
return {c: cookies[c] for c in self.cookienames} return {c: cookies[c] for c in self.cookienames}
def _prepare_post(self, post, extended_tags=False):
url = post["file_url"]
if url[0] == "/":
url = self.root + url
if extended_tags:
self._fetch_extended_tags(post)
post["date"] = text.parse_timestamp(post["created_at"]["s"])
post["tags"] = [tag["name"] for tag in post["tags"]]
return url
def _fetch_extended_tags(self, post):
tags = collections.defaultdict(list)
types = self.TAG_TYPES
for tag in post["tags"]:
tags[types[tag["type"]]].append(tag["name"])
for key, value in tags.items():
post["tags_" + key] = value
def _api_request(self, endpoint, params=None):
url = "https://capi-v2.sankakucomplex.com" + endpoint
while True:
response = self.request(url, params=params, fatal=False)
if response.status_code == 429:
self.wait(until=response.headers.get("X-RateLimit-Reset"))
continue
return response.json()
def _pagination(self, params):
params["lang"] = "en"
params["limit"] = str(self.per_page)
while True:
data = self._api_request("/posts/keyset", params)
if not data.get("success", True):
raise exception.StopExtraction(data.get("code"))
yield from data["data"]
params["next"] = data["meta"]["next"]
if not params["next"]:
return
if "page" in params:
del params["page"]
class SankakuTagExtractor(SankakuExtractor): class SankakuTagExtractor(SankakuExtractor):
"""Extractor for images from chan.sankakucomplex.com by search-tags""" """Extractor for images from chan.sankakucomplex.com by search-tags"""
subcategory = "tag" subcategory = "tag"
directory_fmt = ("{category}", "{search_tags}") directory_fmt = ("{category}", "{search_tags}")
archive_fmt = "t_{search_tags}_{id}" archive_fmt = "t_{search_tags}_{id}"
pattern = r"(?:https?://)?chan\.sankakucomplex\.com/\?([^#]*)" pattern = BASE_PATTERN + r"/\?([^#]*)"
test = ( test = (
("https://chan.sankakucomplex.com/?tags=bonocho", { ("https://beta.sankakucomplex.com/?tags=bonocho", {
"count": 5, "count": 5,
"pattern": r"https://c?s\.sankakucomplex\.com/data/[^/]{2}/[^/]{2}" "pattern": r"https://c?s\.sankakucomplex\.com/data/[^/]{2}/[^/]{2}"
r"/[^/]{32}\.\w+\?e=\d+&m=[^&#]+", r"/[^/]{32}\.\w+\?e=\d+&m=[^&#]+",
}), }),
# respect 'page' query parameter
("https://chan.sankakucomplex.com/?tags=bonocho&page=2", {
"count": 0,
}),
# respect 'next' query parameter
("https://chan.sankakucomplex.com/?tags=bonocho&next=182284", {
"count": 1,
}),
# error on five or more tags # error on five or more tags
("https://chan.sankakucomplex.com/?tags=bonocho+a+b+c+d", { ("https://chan.sankakucomplex.com/?tags=bonocho+a+b+c+d", {
"options": (("username", None),), "options": (("username", None),),
@ -172,128 +155,69 @@ class SankakuTagExtractor(SankakuExtractor):
("https://chan.sankakucomplex.com" ("https://chan.sankakucomplex.com"
"/?tags=marie_rose&page=98&next=3874906&commit=Search"), "/?tags=marie_rose&page=98&next=3874906&commit=Search"),
) )
per_page = 20
def __init__(self, match): def __init__(self, match):
SankakuExtractor.__init__(self, match) SankakuExtractor.__init__(self, match)
query = text.parse_query(match.group(1)) query = text.parse_query(match.group(1))
self.tags = text.unquote(query.get("tags", "").replace("+", " ")) self.tags = text.unquote(query.get("tags", "").replace("+", " "))
self.start_page = text.parse_int(query.get("page"), 1)
self.next = text.parse_int(query.get("next"), 0)
def skip(self, num): def metadata(self):
if self.next: return {"search_tags": self.tags}
self.start_post += num
else:
pages, posts = divmod(num, self.per_page)
self.start_page += pages
self.start_post += posts
return num
def get_metadata(self):
if not self.next:
max_page = 50 if self.logged_in else 25
if self.start_page > max_page:
self.log.info("Traversing from page %d to page %d",
max_page, self.start_page)
self.start_post += self.per_page * (self.start_page - max_page)
self.start_page = max_page
tags = self.tags.split()
if not self.logged_in and len(tags) > 4:
raise exception.StopExtraction(
"Unauthenticated users cannot use more than 4 tags at once.")
return {"search_tags": " ".join(tags)}
def get_posts(self):
params = {"tags": self.tags}
if self.next:
params["next"] = self.next
else:
params["page"] = self.start_page
while True:
self.wait()
page = self.request(self.root, params=params, retries=10).text
pos = page.find("<div id=more-popular-posts-link>") + 1
ids = list(text.extract_iter(page, '" id=p', '>', pos)) def posts(self):
if not ids: return self._pagination({"tags": self.tags})
return
yield from ids
next_qs = text.extract(page, 'next-page-url="/?', '"', pos)[0]
next_id = text.parse_query(next_qs).get("next")
# stop if the same "next" parameter occurs twice in a row (#265)
if "next" in params and params["next"] == next_id:
return
params["next"] = next_id or (text.parse_int(ids[-1]) - 1)
params["page"] = "2"
class SankakuPoolExtractor(SankakuExtractor): class SankakuPoolExtractor(SankakuExtractor):
"""Extractor for image-pools from chan.sankakucomplex.com""" """Extractor for image pools or books from chan.sankakucomplex.com"""
subcategory = "pool" subcategory = "pool"
directory_fmt = ("{category}", "pool", "{pool}") directory_fmt = ("{category}", "pool", "{pool[id]} {pool[name_en]}")
archive_fmt = "p_{pool}_{id}" archive_fmt = "p_{pool}_{id}"
pattern = r"(?:https?://)?chan\.sankakucomplex\.com/pool/show/(\d+)" pattern = BASE_PATTERN + r"/(?:books|pool/show)/(\d+)"
test = ("https://chan.sankakucomplex.com/pool/show/90", { test = (
"count": 5, ("https://beta.sankakucomplex.com/books/90", {
}) "count": 5,
per_page = 24 }),
("https://chan.sankakucomplex.com/pool/show/90"),
)
def __init__(self, match): def __init__(self, match):
SankakuExtractor.__init__(self, match) SankakuExtractor.__init__(self, match)
self.pool_id = match.group(1) self.pool_id = match.group(1)
def skip(self, num): def metadata(self):
pages, posts = divmod(num, self.per_page) pool = self._api_request("/pools/" + self.pool_id)
self.start_page += pages self._posts = pool.pop("posts")
self.start_post += posts return {"pool": pool}
return num
def get_metadata(self):
return {"pool": self.pool_id}
def get_posts(self):
url = self.root + "/pool/show/" + self.pool_id
params = {"page": self.start_page}
while True:
page = self.request(url, params=params, retries=10).text
ids = list(text.extract_iter(page, '" id=p', '>'))
yield from ids
if len(ids) < self.per_page:
return
params["page"] += 1 def posts(self):
return self._posts
class SankakuPostExtractor(SankakuExtractor): class SankakuPostExtractor(SankakuExtractor):
"""Extractor for single images from chan.sankakucomplex.com""" """Extractor for single images from chan.sankakucomplex.com"""
subcategory = "post" subcategory = "post"
archive_fmt = "{id}" archive_fmt = "{id}"
pattern = r"(?:https?://)?chan\.sankakucomplex\.com/post/show/(\d+)" pattern = BASE_PATTERN + r"/post/show/(\d+)"
test = ("https://chan.sankakucomplex.com/post/show/360451", { test = (
"content": "5e255713cbf0a8e0801dc423563c34d896bb9229", ("https://beta.sankakucomplex.com/post/show/360451", {
"options": (("tags", True),), "content": "5e255713cbf0a8e0801dc423563c34d896bb9229",
"keyword": { "options": (("tags", True),),
"tags_artist": "bonocho", "keyword": {
"tags_studio": "dc_comics", "tags_artist": ["bonocho"],
"tags_medium": "sketch copyright_name", "tags_studio": ["dc_comics"],
"tags_copyright": str, "tags_medium": ["sketch", "copyright_name"],
"tags_character": str, "tags_copyright": list,
"tags_general": str, "tags_character": list,
}, "tags_general" : list,
}) },
}),
("https://chan.sankakucomplex.com/post/show/360451"),
)
def __init__(self, match): def __init__(self, match):
SankakuExtractor.__init__(self, match) SankakuExtractor.__init__(self, match)
self.post_id = match.group(1) self.post_id = match.group(1)
def get_posts(self): def posts(self):
return (self.post_id,) return self._pagination({"tags": "id:" + self.post_id})

Loading…
Cancel
Save