# -*- coding: utf-8 -*- # Copyright 2016-2019 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Extract images from https://twitter.com/""" from .common import Extractor, Message from .. import text, exception from ..cache import cache, memcache import json import re class TwitterExtractor(Extractor): """Base class for twitter extractors""" category = "twitter" directory_fmt = ("{category}", "{user[name]}") filename_fmt = "{tweet_id}_{num}.{extension}" archive_fmt = "{tweet_id}_{retweet_id}_{num}" root = "https://twitter.com" sizes = (":orig", ":large", ":medium", ":small") def __init__(self, match): Extractor.__init__(self, match) self.user = match.group(1) self._user_dict = None self.logged_in = False self.retweets = self.config("retweets", True) self.content = self.config("content", False) self.videos = self.config("videos", False) if self.content: self._emoji_sub = re.compile( r']*>').sub def items(self): self.login() metadata = self.metadata() yield Message.Version, 1 for tweet in self.tweets(): data = self._data_from_tweet(tweet) if not data or not self.retweets and data["retweet_id"]: continue data.update(metadata) if self.videos and "-videoContainer" in tweet: yield Message.Directory, data if self.videos == "ytdl": data["extension"] = None url = "ytdl:{}/{}/status/{}".format( self.root, data["user"], data["tweet_id"]) else: url = self._video_from_tweet(data["tweet_id"]) ext = text.ext_from_url(url) if ext == "m3u8": url = "ytdl:" + url data["extension"] = "mp4" data["_ytdl_extra"] = {"protocol": "m3u8_native"} else: data["extension"] = ext data["num"] = 1 yield Message.Url, url, data elif "data-image-url=" in tweet: yield Message.Directory, data images = text.extract_iter( tweet, 'data-image-url="', '"') for data["num"], url in enumerate(images, 1): text.nameext_from_url(url, data) urls = [url + size for size in self.sizes] yield Message.Urllist, urls, data def metadata(self): """Return general metadata""" return {} def tweets(self): """Yield HTML content of all relevant tweets""" def login(self): username, password = self._get_auth_info() if username: self._update_cookies(self._login_impl(username, password)) self.logged_in = True @cache(maxage=360*24*3600, keyarg=1) def _login_impl(self, username, password): self.log.info("Logging in as %s", username) page = self.request(self.root + "/login").text pos = page.index('name="authenticity_token"') token = text.extract(page, 'value="', '"', pos-80)[0] url = self.root + "/sessions" data = { "session[username_or_email]": username, "session[password]" : password, "authenticity_token" : token, "ui_metrics" : '{"rf":{},"s":""}', "scribe_log" : "", "redirect_after_login" : "", "remember_me" : "1", } response = self.request(url, method="POST", data=data) if "/error" in response.url: raise exception.AuthenticationError() return self.session.cookies def _data_from_tweet(self, tweet): extr = text.extract_from(tweet) data = { "tweet_id" : text.parse_int(extr('data-tweet-id="' , '"')), "retweet_id": text.parse_int(extr('data-retweet-id="', '"')), "retweeter" : extr('data-retweeter="' , '"'), "author" : { "name" : extr('data-screen-name="', '"'), "nick" : text.unescape(extr('data-name="' , '"')), "id" : text.parse_int(extr('data-user-id="' , '"')), }, } if not self._user_dict: if data["retweet_id"]: for user in json.loads(text.unescape(extr( 'data-reply-to-users-json="', '"'))): if user["screen_name"] == data["retweeter"]: break else: self.log.warning("Unable to extract user info") return None self._user_dict = { "name": user["screen_name"], "nick": text.unescape(user["name"]), "id" : text.parse_int(user["id_str"]), } else: self._user_dict = data["author"] data["user"] = self._user_dict data["date"] = text.parse_timestamp(extr('data-time="', '"')) if self.content: content = extr('
', '\n
') if '= max_position: return params["max_position"] = max_position = position class TwitterTimelineExtractor(TwitterExtractor): """Extractor for all images from a user's timeline""" subcategory = "timeline" pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com" r"/(?!search)([^/?&#]+)/?(?:$|[?#])") test = ( ("https://twitter.com/supernaturepics", { "range": "1-40", "url": "0106229d408f4111d9a52c8fd2ad687f64842aa4", "keyword": "37f4d35affd733d458d3b235b4a55f619a86f794", }), ("https://mobile.twitter.com/supernaturepics?p=i"), ) def tweets(self): url = "{}/i/profiles/show/{}/timeline/tweets".format( self.root, self.user) return self._tweets_from_api(url) class TwitterMediaExtractor(TwitterExtractor): """Extractor for all images from a user's Media Tweets""" subcategory = "media" pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com" r"/(?!search)([^/?&#]+)/media(?!\w)") test = ( ("https://twitter.com/supernaturepics/media", { "range": "1-40", "url": "0106229d408f4111d9a52c8fd2ad687f64842aa4", }), ("https://mobile.twitter.com/supernaturepics/media#t"), ) def tweets(self): url = "{}/i/profiles/show/{}/media_timeline".format( self.root, self.user) return self._tweets_from_api(url) class TwitterSearchExtractor(TwitterExtractor): """Extractor for all images from a search timeline""" subcategory = "search" directory_fmt = ("{category}", "Search", "{search}") pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com" r"/search/?\?(?:[^&#]+&)*q=([^&#]+)") test = ("https://twitter.com/search?q=nature", { "range": "1-40", "count": 40, }) def metadata(self): return {"search": self.user} def tweets(self): url = "{}/i/search/timeline?f=tweets&q={}".format( self.root, self.user) return self._tweets_from_api(url, "-1") class TwitterTweetExtractor(TwitterExtractor): """Extractor for images from individual tweets""" subcategory = "tweet" pattern = (r"(?:https?://)?(?:www\.|mobile\.)?twitter\.com" r"/([^/?&#]+|i/web)/status/(\d+)") test = ( ("https://twitter.com/supernaturepics/status/604341487988576256", { "url": "0e801d2f98142dd87c3630ded9e4be4a4d63b580", "keyword": "3fa3623e8d9a204597238e2f1f6433da19c63b4a", "content": "ab05e1d8d21f8d43496df284d31e8b362cd3bcab", }), # 4 images ("https://twitter.com/perrypumas/status/894001459754180609", { "url": "c8a262a9698cb733fb27870f5a8f75faf77d79f6", "keyword": "49165725116ac52193a3861e8f5534e47a706b62", }), # video ("https://twitter.com/perrypumas/status/1065692031626829824", { "options": (("videos", True),), "pattern": r"ytdl:https://video.twimg.com/ext_tw_video/.*.m3u8", }), # content with emoji, newlines, hashtags (#338) ("https://twitter.com/yumi_san0112/status/1151144618936823808", { "options": (("content", True),), "keyword": "0b7a3d05607b480c1412dfd85f8606478313e7bf", }), # Reply to another tweet (#403) ("https://twitter.com/tyson_hesse/status/1103767554424598528", { "options": (("videos", "ytdl"),), "pattern": r"ytdl:https://twitter.com/.+/1103767554424598528", }), # /i/web/ URL ("https://twitter.com/i/web/status/1155074198240292865", { "pattern": r"https://pbs.twimg.com/media/EAel0vUUYAAZ4Bq.jpg:orig", }), ) def __init__(self, match): TwitterExtractor.__init__(self, match) self.tweet_id = match.group(2) def tweets(self): url = "{}/i/web/status/{}".format(self.root, self.tweet_id) cookies = {"app_shell_visited": "1"} headers = { "Referer" : url, "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; " "Trident/7.0; rv:11.0) like Gecko", } response = self.request(url, cookies=cookies, headers=headers) if response.history and response.url == self.root + "/": raise exception.AuthorizationError() page = response.text end = page.index('class="js-tweet-stats-container') beg = page.rindex('