You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
9.6 KiB

# -*- coding: utf-8 -*-
# Copyright 2019-2020 Mike Fährmann
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for"""
from .common import Extractor, Message
from .. import text, exception
from ..cache import memcache
import collections
import itertools
import json
class PatreonExtractor(Extractor):
"""Base class for patreon extractors"""
category = "patreon"
root = ""
directory_fmt = ("{category}", "{creator[full_name]}")
filename_fmt = "{id}_{title}_{num:>02}.{extension}"
archive_fmt = "{id}_{num}"
_warning = True
def items(self):
yield Message.Version, 1
if self._warning:
if "session_id" not in self.session.cookies:
self.log.warning("no 'session_id' cookie set")
PatreonExtractor._warning = False
for post in self.posts():
post["num"] = 0
hashes = set()
yield Message.Directory, post
yield Message.Metadata, text.nameext_from_url(
post["creator"].get("image_url", ""), post)
for kind, url, name in itertools.chain(
fhash = url.rsplit("/", 2)[1]
if fhash not in hashes:
post["hash"] = fhash
post["type"] = kind
post["num"] += 1
yield Message.Url, url, text.nameext_from_url(name, post)
self.log.debug("skipping %s (%s %s)", url, fhash, kind)
def _postfile(post):
postfile = post.get("post_file")
if postfile:
return (("postfile", postfile["url"], postfile["name"]),)
return ()
def _images(self, post):
for image in post["images"]:
url = image.get("download_url")
if url:
name = image.get("file_name") or self._filename(url) or url
yield "image", url, name
def _attachments(self, post):
for attachment in post["attachments"]:
url = self.request(
attachment["url"], method="HEAD",
allow_redirects=False, fatal=False,
if url:
yield "attachment", url, attachment["name"]
def _content(post):
content = post.get("content")
if content:
for img in text.extract_iter(
content, '<img data-media-id="', '>'):
url = text.extract(img, 'src="', '"')[0]
if url:
yield "content", url, url
def posts(self):
"""Return all relevant post objects"""
def _pagination(self, url):
headers = {"Referer": self.root}
while url:
if not url.startswith("http"):
url = "https://" + url.lstrip("/:")
posts = self.request(url, headers=headers).json()
if "included" in posts:
included = self._transform(posts["included"])
for post in posts["data"]:
yield self._process(post, included)
if "links" not in posts:
url = posts["links"].get("next")
def _process(self, post, included):
"""Process and extend a 'post' object"""
attr = post["attributes"]
attr["id"] = text.parse_int(post["id"])
attr["images"] = self._files(post, included, "images")
attr["attachments"] = self._files(post, included, "attachments")
attr["date"] = text.parse_datetime(
attr["published_at"], "%Y-%m-%dT%H:%M:%S.%f%z")
user = post["relationships"]["user"]
attr["creator"] = (
self._user(user["links"]["related"]) or
return attr
def _transform(included):
"""Transform 'included' into an easier to handle format"""
result = collections.defaultdict(dict)
for inc in included:
result[inc["type"]][inc["id"]] = inc["attributes"]
return result
def _files(post, included, key):
"""Build a list of files"""
files = post["relationships"].get(key)
if files and files.get("data"):
return [
for file in files["data"]
return []
def _user(self, url):
"""Fetch user information"""
response = self.request(url, fatal=False)
if response.status_code >= 400:
return None
user = response.json()["data"]
attr = user["attributes"]
attr["id"] = user["id"]
attr["date"] = text.parse_datetime(
attr["created"], "%Y-%m-%dT%H:%M:%S.%f%z")
return attr
def _filename(self, url):
"""Fetch filename from its Content-Disposition header"""
response = self.request(url, method="HEAD", fatal=False)
cd = response.headers.get("Content-Disposition")
return text.extract(cd, 'filename="', '"')[0]
def _build_url(endpoint, query):
return (
"" + endpoint +
"&fields[access_rule]=access_rule_type,amount_cents" + query +
class PatreonCreatorExtractor(PatreonExtractor):
"""Extractor for a creator's works"""
subcategory = "creator"
pattern = (r"(?:https?://)?(?:www\.)?patreon\.com"
test = (
("", {
"range": "1-25",
"count": ">= 25",
"keyword": {
"attachments" : list,
"comment_count": int,
"content" : str,
"creator" : dict,
"date" : "type:datetime",
"id" : int,
"images" : list,
"like_count" : int,
"post_type" : str,
"published_at" : str,
"title" : str,
("", {
"exception": exception.NotFoundError,
def __init__(self, match):
PatreonExtractor.__init__(self, match)
self.creator =
def posts(self):
url = "{}/{}".format(self.root, self.creator)
page = self.request(url, notfound="creator").text
campaign_id = text.extract(page, "/campaign/", "/")[0]
if not campaign_id:
raise exception.NotFoundError("creator")
url = self._build_url("posts", (
"&filter[campaign_id]=" + campaign_id
return self._pagination(url)
class PatreonUserExtractor(PatreonExtractor):
"""Extractor for media from creators supported by you"""
subcategory = "user"
pattern = r"(?:https?://)?(?:www\.)?patreon\.com/home$"
test = ("",)
def posts(self):
url = self._build_url("stream", (
return self._pagination(url)
class PatreonPostExtractor(PatreonExtractor):
"""Extractor for media from a single post"""
subcategory = "post"
pattern = r"(?:https?://)?(?:www\.)?patreon\.com/posts/([^/?&#]+)"
test = (
# postfile + attachments
("", {
"count": 4,
# postfile + content
("", {
"count": 4,
("", {
"exception": exception.NotFoundError,
def __init__(self, match):
PatreonExtractor.__init__(self, match)
self.slug =
def posts(self):
url = "{}/posts/{}".format(self.root, self.slug)
page = self.request(url, notfound="post").text
data = text.extract(page, "window.patreon.bootstrap,", "\n});")[0]
post = json.loads(data + "}")["post"]
included = self._transform(post["included"])
return (self._process(post["data"], included),)