You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gallery-dl/gallery_dl/extractor/flickr.py

429 lines
15 KiB

# -*- coding: utf-8 -*-
# Copyright 2017-2018 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extract images from https://www.flickr.com/"""
from .common import Extractor, Message
from .. import text, oauth, util, exception
class FlickrExtractor(Extractor):
"""Base class for flickr extractors"""
category = "flickr"
filename_fmt = "{category}_{id}.{extension}"
def __init__(self, match):
Extractor.__init__(self)
self.api = FlickrAPI(self)
self.item_id = match.group(1)
self.user = None
self.load_extra = self.config("metadata", False)
def items(self):
info = self.data()
yield Message.Version, 1
yield Message.Directory, info
for photo in self.photos():
photo.update(info)
url = photo["photo"]["source"]
yield Message.Url, url, text.nameext_from_url(url, photo)
def data(self):
self.user = self.api.urls_lookupUser(self.item_id)
return {"user": self.user}
def photos(self):
return []
class FlickrImageExtractor(FlickrExtractor):
"""Extractor for individual images from flickr.com"""
subcategory = "image"
archive_fmt = "{id}"
pattern = [r"(?:https?://)?(?:www\.|m\.)?flickr\.com/photos/[^/]+/(\d+)",
r"(?:https?://)?[^.]+\.static\.?flickr\.com/(?:\d+/)+(\d+)_",
r"(?:https?://)?flic\.kr/(p)/([A-Za-z1-9]+)"]
test = [
("https://www.flickr.com/photos/departingyyz/16089302239", {
"url": "7f0887f5953f61c8b79a695cb102ea309c0346b0",
"keyword": "5ecdaf0192802451b7daca9b81f393f207ff7ee9",
"content": "6aaad7512d335ca93286fe2046e7fe3bb93d808e",
}),
("http://c2.staticflickr.com/2/1475/24531000464_9a7503ae68_b.jpg", {
"url": "40f5163488522ca5d918750ed7bd7fcf437982fe"}),
("https://farm2.static.flickr.com/1035/1188352415_cb139831d0.jpg", {
"url": "ef217b4fdcb148a0cc9eae44b9342d4a65f6d697"}),
("https://flic.kr/p/FPVo9U", {
"url": "92c54a00f31040c349cb2abcb1b9abe30cc508ae"}),
("https://www.flickr.com/photos/zzz/16089302238", {
"exception": exception.NotFoundError}),
]
def __init__(self, match):
FlickrExtractor.__init__(self, match)
if self.item_id == "p":
alphabet = ("123456789abcdefghijkmnopqrstu"
"vwxyzABCDEFGHJKLMNPQRSTUVWXYZ")
self.item_id = util.bdecode(match.group(2), alphabet)
def items(self):
size = self.api.photos_getSizes(self.item_id)[-1]
if self.load_extra:
info = self.api.photos_getInfo(self.item_id)
self._clean(info)
else:
info = {"id": self.item_id}
info["photo"] = size
url = size["source"]
text.nameext_from_url(url, info)
yield Message.Version, 1
yield Message.Directory, info
yield Message.Url, url, info
@staticmethod
def _clean(photo):
del photo["comments"]
del photo["views"]
photo["title"] = photo["title"]["_content"]
photo["tags"] = [t["raw"] for t in photo["tags"]["tag"]]
if "location" in photo:
location = photo["location"]
for key, value in location.items():
if isinstance(value, dict):
location[key] = value["_content"]
class FlickrAlbumExtractor(FlickrExtractor):
"""Extractor for photo albums from flickr.com"""
subcategory = "album"
directory_fmt = ["{category}", "{subcategory}s",
"{album[id]} - {album[title]}"]
archive_fmt = "a_{album[id]}_{id}"
pattern = [r"(?:https?://)?(?:www\.)?flickr\.com/"
r"photos/([^/]+)/(?:album|set)s/(\d+)"]
test = [(("https://www.flickr.com/photos/"
"shona_s/albums/72157633471741607"), {
"url": "baf4a3d1b15afcecf9638000a12c0eb3d5df9024",
"keyword": "3a99f962f30691dc1b2da46be56fe1b7768fe707",
})]
def __init__(self, match):
FlickrExtractor.__init__(self, match)
self.album_id = match.group(2)
def data(self):
self._generator = self.api.photosets_getPhotos(self.album_id)
self._first = next(self._generator)
photoset = self._first.copy()
del photoset["photo"]
return {"album": photoset}
def photos(self):
for photo in self._photos():
self.api._extract_format(photo)
yield photo
def _photos(self):
yield from self._first["photo"]
for photoset in self._generator:
yield from photoset["photo"]
class FlickrGalleryExtractor(FlickrExtractor):
"""Extractor for photo galleries from flickr.com"""
subcategory = "gallery"
directory_fmt = ["{category}", "galleries",
"{user[username]} {gallery[id]}"]
archive_fmt = "g_{gallery[id]}_{id}"
pattern = [r"(?:https?://)?(?:www\.)?flickr\.com/"
r"photos/([^/]+)/galleries/(\d+)"]
test = [(("https://www.flickr.com/photos/flickr/"
"galleries/72157681572514792/"), {
"url": "1e0e300fa5fe8c49ba5dfa7ccca0cb0da8a04f93",
"keyword": "ba1f0e4bf5ee4e10071bdc272c19f015985cf055",
})]
def __init__(self, match):
FlickrExtractor.__init__(self, match)
self.gallery_id = match.group(2)
def data(self):
info = FlickrExtractor.data(self)
if self.load_extra:
info["gallery"] = self.api.galleries_getInfo(self.gallery_id)
else:
info["gallery"] = {"id": self.gallery_id}
return info
def photos(self):
return self.api.galleries_getPhotos(self.gallery_id)
class FlickrGroupExtractor(FlickrExtractor):
"""Extractor for group pools from flickr.com"""
subcategory = "group"
directory_fmt = ["{category}", "{subcategory}s", "{group[groupname]}"]
archive_fmt = "G_{group[nsid]}_{id}"
pattern = [r"(?:https?://)?(?:www\.)?flickr\.com/groups/([^/]+)"]
test = [("https://www.flickr.com/groups/bird_headshots/", {
"pattern": (r"https?://farm\d+\.staticflickr\.com"
r"/\d+/\d+_[0-9a-f]+(_[a-z])?\.[a-z]+"),
})]
def data(self):
self.group = self.api.urls_lookupGroup(self.item_id)
return {"group": self.group}
def photos(self):
return self.api.groups_pools_getPhotos(self.group["nsid"])
class FlickrUserExtractor(FlickrExtractor):
"""Extractor for the photostream of a flickr user"""
subcategory = "user"
directory_fmt = ["{category}", "{user[username]}"]
archive_fmt = "u_{user[nsid]}_{id}"
pattern = [r"(?:https?://)?(?:www\.)?flickr\.com/photos/([^/]+)/?$"]
test = [("https://www.flickr.com/photos/shona_s/", {
"url": "d125b536cd8c4229363276b6c84579c394eec3a2",
"keyword": "2cdeae22cd9c3ff19ce905215f3782a7494d8264",
})]
def photos(self):
return self.api.people_getPhotos(self.user["nsid"])
class FlickrFavoriteExtractor(FlickrExtractor):
"""Extractor for favorite photos of a flickr user"""
subcategory = "favorite"
directory_fmt = ["{category}", "{subcategory}s", "{user[username]}"]
archive_fmt = "f_{user[nsid]}_{id}"
pattern = [r"(?:https?://)?(?:www\.)?flickr\.com/photos/([^/]+)/favorites"]
test = [("https://www.flickr.com/photos/shona_s/favorites", {
"url": "5129b3f5bfa83cc25bdae3ce476036de1488dad2",
"keyword": "0e1c9521b6051411b585c9b41a4dc0bcde20e616",
})]
def photos(self):
return self.api.favorites_getList(self.user["nsid"])
class FlickrSearchExtractor(FlickrExtractor):
"""Extractor for flickr photos based on search results"""
subcategory = "search"
directory_fmt = ["{category}", "{subcategory}", "{search[text]}"]
archive_fmt = "s_{search}_{id}"
pattern = [r"(?:https?://)?(?:www\.)?flickr\.com/search/?\?([^#]+)"]
test = [
(("https://flickr.com/search/?text=mountain"), None),
(("https://flickr.com/search/?text=tree%20cloud%20house"
"&color_codes=4&styles=minimalism"), None),
]
def __init__(self, match):
FlickrExtractor.__init__(self, match)
self.search = text.parse_query(match.group(1))
if "text" not in self.search:
self.search["text"] = ""
def data(self):
return {"search": self.search}
def photos(self):
return self.api.photos_search(self.search)
class FlickrAPI(oauth.OAuth1API):
"""Minimal interface for the flickr API"""
API_URL = "https://api.flickr.com/services/rest/"
API_KEY = "ac4fd7aa98585b9eee1ba761c209de68"
API_SECRET = "3adb0f568dc68393"
FORMATS = [
("o", "Original", None),
("k", "Large 2048", 2048),
("h", "Large 1600", 1600),
("l", "Large", 1024),
("c", "Medium 800", 800),
("z", "Medium 640", 640),
("m", "Medium", 500),
("n", "Small 320", 320),
("s", "Small", 240),
("q", "Large Square", 150),
("t", "Thumbnail", 100),
("s", "Square", 75),
]
def __init__(self, extractor):
oauth.OAuth1API.__init__(self, extractor)
self.maxsize = extractor.config("size-max")
if isinstance(self.maxsize, str):
for fmt, fmtname, fmtwidth in self.FORMATS:
if self.maxsize == fmt or self.maxsize == fmtname:
self.maxsize = fmtwidth
break
else:
self.maxsize = None
extractor.log.warning(
"Could not match '%s' to any format", self.maxsize)
if self.maxsize:
self.formats = [fmt for fmt in self.FORMATS
if not fmt[2] or fmt[2] <= self.maxsize]
else:
self.formats = self.FORMATS
self.formats = self.formats[:4]
self.subcategory = extractor.subcategory
def favorites_getList(self, user_id):
"""Returns a list of the user's favorite photos."""
params = {"user_id": user_id}
return self._listing("favorites.getList", params)
def galleries_getInfo(self, gallery_id):
"""Gets information about a gallery."""
params = {"gallery_id": gallery_id}
gallery = self._call("galleries.getInfo", params)["gallery"]
del gallery["count_views"]
del gallery["count_comments"]
gallery["title"] = gallery["title"]["_content"]
gallery["description"] = gallery["description"]["_content"]
return gallery
def galleries_getPhotos(self, gallery_id):
"""Return the list of photos for a gallery."""
params = {"gallery_id": gallery_id}
return self._listing("galleries.getPhotos", params)
def groups_pools_getPhotos(self, group_id):
"""Returns a list of pool photos for a given group."""
params = {"group_id": group_id}
return self._listing("groups.pools.getPhotos", params)
def people_getPhotos(self, user_id):
"""Return photos from the given user's photostream."""
params = {"user_id": user_id}
return self._listing("people.getPhotos", params)
def photos_getInfo(self, photo_id):
"""Get information about a photo."""
params = {"photo_id": photo_id}
return self._call("photos.getInfo", params)["photo"]
def photos_getSizes(self, photo_id):
"""Returns the available sizes for a photo."""
params = {"photo_id": photo_id}
sizes = self._call("photos.getSizes", params)["sizes"]["size"]
if sizes[-1]["media"] == "video":
# filter all non-video and mobile entries
sizes = [size for size in sizes
if size["media"] == "video" and
not size["label"].startswith(("Mobile ", "Video "))]
if self.maxsize:
for index, size in enumerate(sizes):
if index > 0 and (int(size["width"]) > self.maxsize or
int(size["height"]) > self.maxsize):
del sizes[index:]
break
return sizes
def photos_search(self, params):
"""Return a list of photos matching some criteria."""
return self._listing("photos.search", params.copy())
def photosets_getPhotos(self, photoset_id):
"""Get the list of photos in a set."""
params = {"photoset_id": photoset_id}
return self._pagination("photosets.getPhotos", params)
def urls_lookupGroup(self, groupname):
"""Returns a group NSID, given the url to a group's page."""
params = {"url": "https://www.flickr.com/groups/" + groupname}
group = self._call("urls.lookupGroup", params)["group"]
return {"nsid": group["id"],
"path_alias": groupname,
"groupname": group["groupname"]["_content"]}
def urls_lookupUser(self, username):
"""Returns a user NSID, given the url to a user's photos or profile."""
params = {"url": "https://www.flickr.com/photos/" + username}
user = self._call("urls.lookupUser", params)["user"]
return {"nsid": user["id"],
"path_alias": username,
"username": user["username"]["_content"]}
def _call(self, method, params):
params["method"] = "flickr." + method
params["format"] = "json"
params["nojsoncallback"] = "1"
if self.api_key:
params["api_key"] = self.api_key
data = self.session.get(self.API_URL, params=params).json()
if "code" in data and data["code"] == 1:
raise exception.NotFoundError(self.subcategory)
return data
def _pagination(self, method, params):
params["extras"] = ",".join("url_" + fmt[0] for fmt in self.formats)
params["page"] = 1
while True:
data = self._call(method, params)
for key, obj in data.items():
if not key.startswith("stat"):
break
del obj["page"]
del obj["perpage"]
if "per_page" in obj:
del obj["per_page"]
yield obj
if params["page"] >= obj["pages"]:
break
params["page"] += 1
def _listing(self, method, params):
for photos in self._pagination(method, params):
for photo in photos["photo"]:
self._extract_format(photo)
yield photo
def _extract_format(self, photo):
for fmt, fmtname, fmtwidth in self.formats:
key = "url_" + fmt
if key in photo:
width, height = photo["width_" + fmt], photo["height_" + fmt]
if self.maxsize and (int(width) > self.maxsize or
int(height) > self.maxsize):
continue
# generate photo info
photo["photo"] = {
"source": photo[key],
"width" : width,
"height": height,
"label" : fmtname,
"media" : "photo",
}
# remove excess data
keys = [
key for key in photo.keys()
if key.startswith(("url_", "width_", "height_"))
]
for key in keys:
del photo[key]
break
else:
# extra API call to get photo url and size
photo["photo"] = self.photos_getSizes(photo["id"])[-1]