# -*- coding: utf-8 -*- # Copyright 2019 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extractors for https://www.patreon.com/""" from .common import Extractor, Message from .. import text, exception from ..cache import memcache import collections import json class PatreonExtractor(Extractor): """Base class for patreon extractors""" category = "patreon" root = "https://www.patreon.com" directory_fmt = ("{category}", "{creator[full_name]}") filename_fmt = "{id}_{title}_{num:>02}.{extension}" archive_fmt = "{id}_{num}" _warning = True def items(self): yield Message.Version, 1 if self._warning: if "session_id" not in self.session.cookies: self.log.warning("no 'session_id' cookie set") PatreonExtractor._warning = False for post in self.posts(): ids = set() post["num"] = 0 content = post.get("content") postfile = post.get("post_file") yield Message.Directory, post yield Message.Metadata, text.nameext_from_url( post["creator"].get("image_url", ""), post) for image in post["images"]: url = image.get("download_url") if not url: continue ids.add(url.split("/")[-2]) name = image.get("file_name") or self._filename(url) or url post["num"] += 1 post["type"] = "image" yield Message.Url, url, text.nameext_from_url(name, post) if postfile and postfile["url"].split("/")[-2] not in ids: post["num"] += 1 post["type"] = "postfile" text.nameext_from_url(postfile["name"], post) yield Message.Url, postfile["url"], post for attachment in post["attachments"]: post["num"] += 1 post["type"] = "attachment" text.nameext_from_url(attachment["name"], post) yield Message.Url, attachment["url"], post if content: for url in text.extract_iter(content, 'src="', '"'): post["num"] += 1 post["type"] = "content" yield Message.Url, url, text.nameext_from_url(url, post) def posts(self): """Return all relevant post objects""" def _pagination(self, url): headers = {"Referer": self.root} while url: if not url.startswith("http"): url = "https://" + url.lstrip("/:") posts = self.request(url, headers=headers).json() if "included" in posts: included = self._transform(posts["included"]) for post in posts["data"]: yield self._process(post, included) if "links" not in posts: return url = posts["links"].get("next") def _process(self, post, included): """Process and extend a 'post' object""" attr = post["attributes"] attr["id"] = text.parse_int(post["id"]) attr["images"] = self._files(post, included, "images") attr["attachments"] = self._files(post, included, "attachments") attr["date"] = text.parse_datetime( attr["published_at"], "%Y-%m-%dT%H:%M:%S.%f%z") user = post["relationships"]["user"] attr["creator"] = ( self._user(user["links"]["related"]) or included["user"][user["data"]["id"]]) return attr @staticmethod def _transform(included): """Transform 'included' into an easier to handle format""" result = collections.defaultdict(dict) for inc in included: result[inc["type"]][inc["id"]] = inc["attributes"] return result @staticmethod def _files(post, included, key): """Build a list of files""" files = post["relationships"].get(key) if files and files.get("data"): return [ included[file["type"]][file["id"]] for file in files["data"] ] return [] @memcache(keyarg=1) def _user(self, url): """Fetch user information""" response = self.request(url, fatal=False) if response.status_code >= 400: return None user = response.json()["data"] attr = user["attributes"] attr["id"] = user["id"] attr["date"] = text.parse_datetime( attr["created"], "%Y-%m-%dT%H:%M:%S.%f%z") return attr def _filename(self, url): """Fetch filename from its Content-Disposition header""" response = self.request(url, method="HEAD", fatal=False) cd = response.headers.get("Content-Disposition") return text.extract(cd, 'filename="', '"')[0] @staticmethod def _build_url(endpoint, query): return ( "https://www.patreon.com/api/" + endpoint + "?include=user,images,attachments,user_defined_tags,campaign,poll." "choices,poll.current_user_responses.user,poll.current_user_respon" "ses.choice,poll.current_user_responses.poll,access_rules.tier.nul" "l" "&fields[post]=change_visibility_at,comment_count,content,current_" "user_can_delete,current_user_can_view,current_user_has_liked,embe" "d,image,is_paid,like_count,min_cents_pledged_to_view,post_file,pu" "blished_at,patron_count,patreon_url,post_type,pledge_url,thumbnai" "l_url,teaser_text,title,upgrade_url,url,was_posted_by_campaign_ow" "ner" "&fields[user]=image_url,full_name,url" "&fields[campaign]=avatar_photo_url,earnings_visibility,is_nsfw,is" "_monthly,name,url" "&fields[access_rule]=access_rule_type,amount_cents" + query + "&json-api-use-default-includes=false" "&json-api-version=1.0" ) class PatreonCreatorExtractor(PatreonExtractor): """Extractor for a creator's works""" subcategory = "creator" pattern = (r"(?:https?://)?(?:www\.)?patreon\.com" r"/(?!(?:home|join|posts|login|signup)(?:$|[/?&#]))" r"([^/?&#]+)/?") test = ( ("https://www.patreon.com/koveliana", { "range": "1-25", "count": ">= 25", "keyword": { "attachments" : list, "comment_count": int, "content" : str, "creator" : dict, "date" : "type:datetime", "id" : int, "images" : list, "like_count" : int, "post_type" : str, "published_at" : str, "title" : str, }, }), ("https://www.patreon.com/kovelianot", { "exception": exception.NotFoundError, }), ) def __init__(self, match): PatreonExtractor.__init__(self, match) self.creator = match.group(1).lower() def posts(self): url = "{}/{}".format(self.root, self.creator) page = self.request(url, notfound="creator").text campaign_id = text.extract(page, "/campaign/", "/")[0] if not campaign_id: raise exception.NotFoundError("creator") url = self._build_url("posts", ( "&sort=-published_at" "&filter[is_draft]=false" "&filter[contains_exclusive_posts]=true" "&filter[campaign_id]=" + campaign_id )) return self._pagination(url) class PatreonUserExtractor(PatreonExtractor): """Extractor for media from creators supported by you""" subcategory = "user" pattern = r"(?:https?://)?(?:www\.)?patreon\.com/home$" test = ("https://www.patreon.com/home",) def posts(self): url = self._build_url("stream", ( "&page[cursor]=null" "&filter[is_following]=true" )) return self._pagination(url) class PatreonPostExtractor(PatreonExtractor): """Extractor for media from a single post""" subcategory = "post" pattern = r"(?:https?://)?(?:www\.)?patreon\.com/posts/([^/?&#]+)" test = ( ("https://www.patreon.com/posts/precious-metal-23563293", { "count": 4, }), ("https://www.patreon.com/posts/er1-28201153", { "count": 1, }), ("https://www.patreon.com/posts/not-found-123", { "exception": exception.NotFoundError, }), ) def __init__(self, match): PatreonExtractor.__init__(self, match) self.slug = match.group(1) def posts(self): url = "{}/posts/{}".format(self.root, self.slug) page = self.request(url, notfound="post").text data = text.extract(page, "window.patreon.bootstrap,", "\n});")[0] post = json.loads(data + "}")["post"] included = self._transform(post["included"]) return (self._process(post["data"], included),)