Merge branch '1.4-dev'

pull/86/head
Mike Fährmann 6 years ago
commit f3d770d4e2
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88

@ -194,7 +194,7 @@ OAuth
-----
*gallery-dl* supports user authentication via OAuth_ for
``deviantart``, ``flickr``, ``reddit`` and ``tumblr``.
``deviantart``, ``flickr``, ``reddit``, ``smugmug`` and ``tumblr``.
This is entirely optional, but grants *gallery-dl* the ability
to issue requests on your account's behalf and enables it to access resources
which would otherwise be unavailable to a public user.

@ -100,7 +100,21 @@
"output":
{
"mode": "terminal",
"logfile": "~/gallery-dl/log.txt"
"log": {
"format": "{name}: {message}",
"level": "info"
},
"logfile": {
"path": "~/gallery-dl/log.txt",
"mode": "w",
"level": "debug"
},
"unsupportedfile": {
"path": "~/gallery-dl/unsupported.txt",
"mode": "a",
"format": "{asctime} {message}",
"format-date": "%Y-%m-%d-%H-%M-%S"
}
},
"cache": {

@ -68,7 +68,7 @@ Sea Otter Scans https://reader.seaotterscans.com/ Chapters, Manga
Sen Manga http://raw.senmanga.com/ Chapters
Sense-Scans http://sensescans.com/ Chapters, Manga
SlideShare https://www.slideshare.net/ Presentations
SmugMug https://www.smugmug.com/ |Albums, individ-5|
SmugMug https://www.smugmug.com/ |Albums, individ-5| Optional (OAuth)
Subapics https://subapics.com/ Chapters, Manga
The /b/ Archive https://thebarchive.com/ Threads
Tumblr https://www.tumblr.com/ Images from Users, Likes, Posts, Tag-Searches Optional (OAuth)
@ -88,8 +88,8 @@ Turboimagehost https://turboimagehost.com/ individual Images
==================== =================================== ================================================== ================
.. |Images from Use-0| replace:: Images from Users, Albums, Challenges, individual Images, Likes, Search Results
.. |Collections, De-1| replace:: Collections, Deviations, Favorites, Folders, Galleries, Journals
.. |Collections, De-1| replace:: Collections, Deviations, Favorites, Folders, Galleries, Journals, Popular Images
.. |Images from Use-2| replace:: Images from Users, Albums, Favorites, Galleries, Groups, individual Images, Search Results
.. |Images from Use-3| replace:: Images from Users, Doujin, Favorites, individual Images
.. |Images from Use-4| replace:: Images from Users, Bookmarks, Favorites, pixiv.me Links, Rankings, Individual Images
.. |Images from Use-4| replace:: Images from Users, Bookmarks, Favorites, Follows, pixiv.me Links, Rankings, Search Results, Individual Images
.. |Albums, individ-5| replace:: Albums, individual Images, Images from Users and Folders

@ -27,20 +27,72 @@ from . import version, config, option, extractor, job, util, exception
__version__ = version.__version__
log = logging.getLogger("gallery-dl")
def initialize_logging(loglevel, formatter):
LOG_FORMAT = "[{name}][{levelname}] {message}"
LOG_FORMAT_DATE = "%Y-%m-%d %H:%M:%S"
LOG_LEVEL = logging.INFO
def initialize_logging(loglevel):
"""Setup basic logging functionality before configfiles have been loaded"""
# convert levelnames to lowercase
for level in (10, 20, 30, 40, 50):
name = logging.getLevelName(level)
logging.addLevelName(level, name.lower())
# setup basic logging to stderr
formatter = logging.Formatter(LOG_FORMAT, LOG_FORMAT_DATE, "{")
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(loglevel)
root = logging.getLogger()
root.setLevel(loglevel)
root.setLevel(logging.NOTSET)
root.addHandler(handler)
def setup_logging_handler(key, fmt=LOG_FORMAT, lvl=LOG_LEVEL):
"""Setup a new logging handler"""
opts = config.interpolate(("output", key))
if not opts:
return None
if isinstance(opts, str):
opts = {"path": opts}
path = opts.get("path")
mode = opts.get("mode", "w")
try:
path = util.expand_path(path)
handler = logging.FileHandler(path, mode)
except (OSError, ValueError) as exc:
log.warning("%s: %s", key, exc)
return None
except TypeError as exc:
log.warning("%s: missing or invalid path (%s)", key, exc)
return None
level = opts.get("level", lvl)
logfmt = opts.get("format", fmt)
datefmt = opts.get("format-date", LOG_FORMAT_DATE)
formatter = logging.Formatter(logfmt, datefmt, "{")
handler.setFormatter(formatter)
handler.setLevel(level)
return handler
def configure_logging_handler(key, handler):
"""Configure a logging handler"""
opts = config.interpolate(("output", key))
if not opts:
return
if isinstance(opts, str):
opts = {"format": opts}
if handler.level == LOG_LEVEL and "level" in opts:
handler.setLevel(opts["level"])
if "format" in opts or "format-date" in opts:
logfmt = opts.get("format", LOG_FORMAT)
datefmt = opts.get("format-date", LOG_FORMAT_DATE)
formatter = logging.Formatter(logfmt, datefmt, "{")
handler.setFormatter(formatter)
def replace_std_streams(errors="replace"):
"""Replace standard streams and set their error handlers to 'errors'"""
for name in ("stdout", "stdin", "stderr"):
@ -159,8 +211,7 @@ def main():
parser = option.build_parser()
args = parser.parse_args()
formatter = logging.Formatter("[%(name)s][%(levelname)s] %(message)s")
initialize_logging(args.loglevel, formatter)
initialize_logging(args.loglevel)
# configuration
if args.load_config:
@ -173,17 +224,13 @@ def main():
config.set(key, value)
config.set(("_",), {})
# logfile
logfile = config.interpolate(("output", "logfile"))
if logfile:
try:
path = util.expand_path(logfile)
handler = logging.FileHandler(path, "w")
except OSError as exc:
log.warning("log file: %s", exc)
else:
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
# stream logging handler
configure_logging_handler("log", logging.getLogger().handlers[0])
# file logging handler
handler = setup_logging_handler("logfile", lvl=args.loglevel)
if handler:
logging.getLogger().addHandler(handler)
# loglevels
if args.loglevel >= logging.ERROR:
@ -243,13 +290,13 @@ def main():
except OSError as exc:
log.warning("input file: %s", exc)
unsupportedfile = config.interpolate(("output", "unsupportedfile"))
if unsupportedfile:
try:
path = util.expand_path(unsupportedfile)
job.Job.ufile = open(path, "w")
except OSError as exc:
log.warning("unsupported-URL file: %s", exc)
# unsupported file logging handler
handler = setup_logging_handler("unsupportedfile", fmt="{message}")
if handler:
ulog = logging.getLogger("unsupported")
ulog.addHandler(handler)
ulog.propagate = False
job.Job.ulog = ulog
prepare_range(args.image_range, "image")
prepare_range(args.chapter_range, "chapter")

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015-2017 Mike Fährmann
# Copyright 2015-2018 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
@ -8,6 +8,7 @@
"""Methods to access sites behind Cloudflare protection"""
import re
import time
import operator
import urllib.parse
@ -30,6 +31,7 @@ def request_func(self, *args, **kwargs):
def solve_challenge(session, response):
session.headers["Referer"] = response.url
page = response.text
params = text.extract_all(page, (
@ -37,58 +39,74 @@ def solve_challenge(session, response):
('pass' , 'name="pass" value="', '"'),
))[0]
params["jschl_answer"] = solve_jschl(response.url, page)
time.sleep(4)
url = urllib.parse.urljoin(response.url, "/cdn-cgi/l/chk_jschl")
url = text.urljoin(response.url, "/cdn-cgi/l/chk_jschl")
return session.get(url, params=params)
def solve_jschl(url, page):
"""Solve challenge to get 'jschl_answer' value"""
# build variable name
# e.g. '...f, wqnVscP={"DERKbJk":+(...' --> wqnVscP.DERKbJk
data, pos = text.extract_all(page, (
('var' , ',f, ', '='),
('key' , '"', '"'),
('expr', ':', '}'),
))
solution = evaluate_expression(data["expr"])
variable = "{}.{}".format(data["var"], data["key"])
vlength = len(variable)
# evaluate the initial expression
solution = evaluate_expression(data["expr"])
# iterator over all remaining expressions
# and combine their values in 'solution'
expressions = text.extract(
page, "'challenge-form');", "f.submit();", pos
)[0]
page, "'challenge-form');", "f.submit();", pos)[0]
for expr in expressions.split(";")[1:]:
if expr.startswith(variable):
# select arithmetc function based on operator (+, -, *)
func = operator_functions[expr[vlength]]
# evaluate the rest of the expression
value = evaluate_expression(expr[vlength+2:])
# combine the expression value with our current solution
solution = func(solution, value)
elif expr.startswith("a.value"):
# add length of the hostname, i.e. add 11 for 'example.org'
solution += len(urllib.parse.urlsplit(url).netloc)
if ".toFixed(" in expr:
# trim the solution to 10 decimal places
# and strip trailing zeros
solution = "{:.10f}".format(solution).rstrip("0")
return solution
def evaluate_expression(expr):
def evaluate_expression(expr, split_re=re.compile(r"\(+([^)]*)\)")):
"""Evaluate a Javascript expression for the challenge"""
if "/" in expr:
# split the expression in numerator and denominator subexpressions,
# evaluate them separately,
# and return their fraction-result
num, _, denom = expr.partition("/")
return evaluate_expression(num) / evaluate_expression(denom)
stack = []
ranges = []
value = ""
for index, char in enumerate(expr):
if char == "(":
stack.append(index+1)
elif char == ")":
begin = stack.pop()
if stack:
ranges.append((begin, index))
for subexpr in [expr[begin:end] for begin, end in ranges] or (expr,):
num = 0
for part in subexpr.split("[]"):
num += expression_values[part]
value += str(num)
return int(value)
# iterate over all subexpressions,
# evaluate them,
# and accumulate their values in 'result'
result = ""
for subexpr in split_re.findall(expr):
result += str(sum(
expression_values[part]
for part in subexpr.split("[]")
))
return int(result)
operator_functions = {

@ -12,7 +12,7 @@ import time
import mimetypes
from requests.exceptions import ConnectionError, Timeout
from .common import DownloaderBase
from .. import util, exception
from .. import text, exception
class Downloader(DownloaderBase):
@ -28,7 +28,7 @@ class Downloader(DownloaderBase):
self.chunk_size = 16384
if self.rate:
self.rate = util.parse_bytes(self.rate)
self.rate = text.parse_bytes(self.rate)
if not self.rate:
self.log.warning("Invalid rate limit specified")
elif self.rate < self.chunk_size:
@ -61,7 +61,7 @@ class Downloader(DownloaderBase):
else:
self.response.raise_for_status()
return offset, util.safe_int(size)
return offset, text.parse_int(size)
def receive(self, file):
if self.rate:

@ -73,7 +73,7 @@ class ArtstationExtractor(Extractor):
def get_user_info(self, username):
"""Return metadata for a specific user"""
url = "{}/users/{}/quick.json".format(self.root, username.lower())
response = self.request(url, fatal=False, allow_empty=True)
response = self.request(url, fatal=False)
if response.status_code == 404:
raise exception.NotFoundError("user")
return response.json()
@ -158,7 +158,7 @@ class ArtstationAlbumExtractor(ArtstationExtractor):
def __init__(self, match):
ArtstationExtractor.__init__(self, match)
self.album_id = util.safe_int(match.group(2))
self.album_id = text.parse_int(match.group(2))
def metadata(self):
userinfo = self.get_user_info(self.user)
@ -256,7 +256,7 @@ class ArtstationChallengeExtractor(ArtstationExtractor):
def _id_from_url(url):
"""Get an image's submission ID from its URL"""
parts = url.split("/")
return util.safe_int("".join(parts[7:10]))
return text.parse_int("".join(parts[7:10]))
class ArtstationSearchExtractor(ArtstationExtractor):

@ -10,7 +10,6 @@
from .common import SharedConfigExtractor, Message
from .. import text
from urllib.parse import urljoin
from xml.etree import ElementTree
import datetime
import operator
@ -52,7 +51,7 @@ class BooruExtractor(SharedConfigExtractor):
try:
url = image["file_url"]
if url.startswith("/"):
url = urljoin(self.api_url, url)
url = text.urljoin(self.api_url, url)
image.update(data)
yield Message.Url, url, text.nameext_from_url(url, image)
except KeyError:

@ -52,34 +52,34 @@ class Extractor():
("extractor", self.category, self.subcategory, key), default)
def request(self, url, method="GET", encoding=None, fatal=True, retries=3,
allow_empty=False, *args, **kwargs):
max_retries = retries
*args, **kwargs):
max_tries = retries
while True:
try:
response = None
response = self.session.request(method, url, *args, **kwargs)
if fatal:
response.raise_for_status()
if encoding:
response.encoding = encoding
if response.content or allow_empty:
return response
msg = "empty response body"
except requests.exceptions.HTTPError as exc:
except (requests.ConnectionError, requests.Timeout) as exc:
msg = exc
code = response.status_code
if 400 <= code < 500 and code != 429: # Client Error
retries = 0
except requests.exceptions.RequestException as exc:
msg = exc
if not retries:
raise exception.HttpError(msg)
if response and response.status_code == 429: # Too Many Requests
waittime = float(response.headers.get("Retry-After", 10.0))
raise exception.HttpError(exc)
else:
waittime = 1
if 200 <= response.status_code < 400 or not fatal:
if encoding:
response.encoding = encoding
return response
msg = "{} HTTP Error: {} for url: {}".format(
response.status_code, response.reason, url)
if response.status_code < 500 and response.status_code != 429:
break
if not retries:
break
tries = max_tries - retries
retries -= 1
time.sleep(waittime * (max_retries - retries))
self.log.debug("%s (%d/%d)", msg, tries, max_tries)
time.sleep(2 ** tries)
raise exception.HttpError(msg)
def _get_auth_info(self):
"""Return authentication information as (username, password) tuple"""

@ -9,7 +9,7 @@
"""Extract images from https://www.deviantart.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, exception
from ..cache import cache, memcache
import itertools
import datetime
@ -62,7 +62,7 @@ class DeviantartExtractor(Extractor):
if "videos" in deviation:
video = max(deviation["videos"],
key=lambda x: util.safe_int(x["quality"][:-1]))
key=lambda x: text.parse_int(x["quality"][:-1]))
yield self.commit(deviation, video)
if "flash" in deviation:

@ -9,7 +9,7 @@
"""Extract manga-chapters from https://dynasty-scans.com/"""
from .common import ChapterExtractor
from .. import text, util
from .. import text
import re
import json
@ -53,7 +53,7 @@ class DynastyscansChapterExtractor(ChapterExtractor):
return {
"manga": text.unescape(match.group(1)),
"chapter": util.safe_int(match.group(2)),
"chapter": text.parse_int(match.group(2)),
"chapter_minor": match.group(3) or "",
"title": text.unescape(match.group(4) or ""),
"author": text.remove_html(author),

@ -120,7 +120,7 @@ class ExhentaiGalleryExtractor(ExhentaiExtractor):
self.key = {}
self.count = 0
self.version, self.gid, self.token = match.groups()
self.gid = util.safe_int(self.gid)
self.gid = text.parse_int(self.gid)
def items(self):
self.login()
@ -163,8 +163,8 @@ class ExhentaiGalleryExtractor(ExhentaiExtractor):
data["lang"] = util.language_to_code(data["language"])
data["title"] = text.unescape(data["title"])
data["title_jp"] = text.unescape(data["title_jp"])
data["count"] = util.safe_int(data["count"])
data["gallery_size"] = util.parse_bytes(
data["count"] = text.parse_int(data["count"])
data["gallery_size"] = text.parse_bytes(
data["gallery_size"].rstrip("Bb"))
return data
@ -245,18 +245,18 @@ class ExhentaiGalleryExtractor(ExhentaiExtractor):
def _parse_image_info(url):
parts = url.split("/")[4].split("-")
return {
"width": util.safe_int(parts[2]),
"height": util.safe_int(parts[3]),
"size": util.safe_int(parts[1]),
"width": text.parse_int(parts[2]),
"height": text.parse_int(parts[3]),
"size": text.parse_int(parts[1]),
}
@staticmethod
def _parse_original_info(info):
parts = info.lstrip().split(" ")
return {
"width": util.safe_int(parts[0]),
"height": util.safe_int(parts[2]),
"size": util.parse_bytes(parts[3] + parts[4][0]),
"width": text.parse_int(parts[0]),
"height": text.parse_int(parts[2]),
"size": text.parse_bytes(parts[3] + parts[4][0]),
}
@ -274,7 +274,7 @@ class ExhentaiSearchExtractor(ExhentaiExtractor):
def __init__(self, match):
ExhentaiExtractor.__init__(self)
self.params = text.parse_query(match.group(1) or "")
self.params["page"] = util.safe_int(self.params.get("page"))
self.params["page"] = text.parse_int(self.params.get("page"))
self.url = self.root
def items(self):
@ -308,7 +308,7 @@ class ExhentaiSearchExtractor(ExhentaiExtractor):
return Message.Queue, url, {
"type": gtype,
"date": date,
"gallery_id": util.safe_int(parts[1]),
"gallery_id": text.parse_int(parts[1]),
"gallery_token": parts[2],
"title": text.unescape(title),
key: last,

@ -98,8 +98,8 @@ class FallenangelsMangaExtractor(MangaExtractor):
chapter, dot, minor = chapter.partition(".")
results.append((url, {
"manga": manga, "title": title,
"volume": util.safe_int(volume),
"chapter": util.safe_int(chapter),
"volume": text.parse_int(volume),
"chapter": text.parse_int(chapter),
"chapter_minor": dot + minor,
"lang": self.lang, "language": language,
}))

@ -9,7 +9,7 @@
"""Extract images from https://www.flickr.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, oauth, util, exception
class FlickrExtractor(Extractor):
@ -243,7 +243,7 @@ class FlickrSearchExtractor(FlickrExtractor):
return self.api.photos_search(self.search)
class FlickrAPI():
class FlickrAPI(oauth.OAuth1API):
"""Minimal interface for the flickr API"""
API_URL = "https://api.flickr.com/services/rest/"
API_KEY = "ac4fd7aa98585b9eee1ba761c209de68"
@ -264,17 +264,7 @@ class FlickrAPI():
]
def __init__(self, extractor):
self.api_key = extractor.config("api-key", self.API_KEY)
self.api_secret = extractor.config("api-secret", self.API_SECRET)
token = extractor.config("access-token")
token_secret = extractor.config("access-token-secret")
if token and token_secret:
self.session = util.OAuthSession(
extractor.session,
self.api_key, self.api_secret, token, token_secret)
self.api_key = None
else:
self.session = extractor.session
oauth.OAuth1API.__init__(self, extractor)
self.maxsize = extractor.config("size-max")
if isinstance(self.maxsize, str):

@ -50,8 +50,8 @@ class FoolslideExtractor(SharedConfigExtractor):
lang = info[1].partition("-")[0]
data["lang"] = lang
data["language"] = util.code_to_language(lang)
data["volume"] = util.safe_int(info[2])
data["chapter"] = util.safe_int(info[3])
data["volume"] = text.parse_int(info[2])
data["chapter"] = text.parse_int(info[3])
data["chapter_minor"] = "." + info[4] if len(info) >= 5 else ""
return data
@ -75,7 +75,7 @@ class FoolslideChapterExtractor(FoolslideExtractor):
imgs = self.get_images(page)
data["count"] = len(imgs)
data["chapter_id"] = util.safe_int(imgs[0]["chapter_id"])
data["chapter_id"] = text.parse_int(imgs[0]["chapter_id"])
yield Message.Version, 1
yield Message.Directory, data
@ -88,7 +88,7 @@ class FoolslideChapterExtractor(FoolslideExtractor):
except KeyError:
pass
for key in ("height", "id", "size", "width"):
image[key] = util.safe_int(image[key])
image[key] = text.parse_int(image[key])
data.update(image)
text.nameext_from_url(data["filename"], data)
yield Message.Url, url, data

@ -37,7 +37,7 @@ class GelbooruExtractor(SharedConfigExtractor):
if isinstance(post, str):
post = self.get_post_data(post)
for key in ("id", "width", "height", "score", "change"):
post[key] = util.safe_int(post[key])
post[key] = text.parse_int(post[key])
url = post["file_url"]
post.update(data)
yield Message.Url, url, text.nameext_from_url(url, post)
@ -174,7 +174,7 @@ class GelbooruPoolExtractor(GelbooruExtractor):
raise exception.NotFoundError("pool")
return {
"pool": util.safe_int(self.pool_id),
"pool": text.parse_int(self.pool_id),
"pool_name": text.unescape(name),
"count": len(self.posts),
}

@ -9,8 +9,7 @@
"""Extract images from http://www.hbrowse.com/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util
from urllib.parse import urljoin
from .. import text
import json
@ -30,7 +29,7 @@ class HbrowseExtractor():
), values=data)
data["manga"] = text.unescape(data["manga"])
data["total"] = util.safe_int(data["total"])
data["total"] = text.parse_int(data["total"])
data["artist"] = text.remove_html(data["artist"])
data["origin"] = text.remove_html(data["origin"])
return data
@ -48,7 +47,7 @@ class HbrowseMangaExtractor(HbrowseExtractor, MangaExtractor):
def chapters(self, page):
results = []
data = self.parse_page(page, {
"manga_id": util.safe_int(
"manga_id": text.parse_int(
self.url.rstrip("/").rpartition("/")[2])
})
@ -59,9 +58,9 @@ class HbrowseMangaExtractor(HbrowseExtractor, MangaExtractor):
if not url:
return results
title, pos = text.extract(page, '>View ', '<', pos)
data["chapter"] = util.safe_int(url.rpartition("/")[2][1:])
data["chapter"] = text.parse_int(url.rpartition("/")[2][1:])
data["title"] = title
results.append((urljoin(self.root, url), data.copy()))
results.append((text.urljoin(self.root, url), data.copy()))
class HbrowseChapterExtractor(HbrowseExtractor, ChapterExtractor):
@ -84,8 +83,8 @@ class HbrowseChapterExtractor(HbrowseExtractor, ChapterExtractor):
def get_metadata(self, page):
return self.parse_page(page, {
"manga_id": util.safe_int(self.gid),
"chapter": util.safe_int(self.chapter)
"manga_id": text.parse_int(self.gid),
"chapter": text.parse_int(self.chapter)
})
def get_images(self, page):

@ -9,7 +9,7 @@
"""Extract hentai-manga from https://hentai2read.com/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util
from .. import text
import re
import json
@ -36,7 +36,8 @@ class Hentai2readMangaExtractor(MangaExtractor):
page, '<span itemprop="name">', '</span>')
mtype, pos = text.extract(
page, '<small class="text-danger">[', ']</small>', pos)
manga_id = util.safe_int(text.extract(page, 'data-mid="', '"', pos)[0])
manga_id = text.parse_int(text.extract(
page, 'data-mid="', '"', pos)[0])
while True:
chapter_id, pos = text.extract(page, ' data-cid="', '"', pos)
@ -49,8 +50,8 @@ class Hentai2readMangaExtractor(MangaExtractor):
chapter, _, title = text.unescape(chapter).strip().partition(" - ")
results.append((url, {
"manga_id": manga_id, "manga": manga, "type": mtype,
"chapter_id": util.safe_int(chapter_id),
"chapter": util.safe_int(chapter),
"chapter_id": text.parse_int(chapter_id),
"chapter": text.parse_int(chapter),
"title": title, "lang": "en", "language": "English",
}))
@ -78,9 +79,9 @@ class Hentai2readChapterExtractor(ChapterExtractor):
r"(\d+): (.+) . Page 1 ", title)
return {
"manga": match.group(1),
"manga_id": util.safe_int(manga_id),
"chapter": util.safe_int(self.chapter),
"chapter_id": util.safe_int(chapter_id),
"manga_id": text.parse_int(manga_id),
"chapter": text.parse_int(self.chapter),
"chapter_id": text.parse_int(chapter_id),
"type": match.group(2),
"author": match.group(3),
"title": match.group(5),

@ -10,7 +10,6 @@
from .common import Extractor, Message
from .. import text, util, exception
from urllib.parse import urljoin
class HentaifoundryExtractor(Extractor):
@ -47,13 +46,13 @@ class HentaifoundryExtractor(Extractor):
def get_image_metadata(self, url):
"""Collect metadata for an image"""
page = self.request(urljoin(self.root, url)).text
page = self.request(text.urljoin(self.root, url)).text
index = url.rsplit("/", 2)[1]
title, pos = text.extract(
page, 'Pictures</a> &raquo; <span>', '<')
part, pos = text.extract(
page, '//pictures.hentai-foundry.com', '"', pos)
data = {"index": util.safe_int(index), "title": text.unescape(title)}
data = {"index": text.parse_int(index), "title": text.unescape(title)}
text.nameext_from_url(part, data)
return "https://pictures.hentai-foundry.com" + part, data
@ -77,7 +76,7 @@ class HentaifoundryUserExtractor(HentaifoundryExtractor):
def __init__(self, match):
HentaifoundryExtractor.__init__(self, match.group(1) or match.group(3))
self.start_page = util.safe_int(match.group(2), 1)
self.start_page = text.parse_int(match.group(2), 1)
self._skipped = (self.start_page - 1) * self.per_page
def skip(self, num):
@ -104,7 +103,7 @@ class HentaifoundryUserExtractor(HentaifoundryExtractor):
if response.status_code == 404:
raise exception.NotFoundError("user")
count = util.safe_int(text.extract(
count = text.parse_int(text.extract(
response.text, 'class="active" >Pictures (', ')')[0])
if self._skipped >= count:
raise exception.StopExtraction()
@ -142,7 +141,7 @@ class HentaifoundryUserExtractor(HentaifoundryExtractor):
"filter_type": 0,
}
self.request("https://www.hentai-foundry.com/site/filters",
method="post", data=formdata, allow_empty=True)
method="post", data=formdata)
class HentaifoundryImageExtractor(HentaifoundryExtractor):
@ -171,4 +170,4 @@ class HentaifoundryImageExtractor(HentaifoundryExtractor):
return ("{}/{}?enterAgree=1".format(self.artist_url, self.index),)
def get_job_metadata(self):
return {"artist": self.artist, "index": util.safe_int(self.index)}
return {"artist": self.artist, "index": text.parse_int(self.index)}

@ -9,7 +9,7 @@
"""Extract hentai-manga from https://hentaihere.com/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util
from .. import text
import re
import json
@ -32,7 +32,7 @@ class HentaihereMangaExtractor(MangaExtractor):
def chapters(self, page):
results = []
manga_id = util.safe_int(
manga_id = text.parse_int(
self.url.rstrip("/").rpartition("/")[2][1:])
manga, pos = text.extract(
page, '<span itemprop="name">', '</span>')
@ -50,8 +50,8 @@ class HentaihereMangaExtractor(MangaExtractor):
chapter, _, title = text.unescape(chapter).strip().partition(" - ")
results.append((url, {
"manga_id": manga_id, "manga": manga, "type": mtype,
"chapter_id": util.safe_int(chapter_id),
"chapter": util.safe_int(chapter),
"chapter_id": text.parse_int(chapter_id),
"chapter": text.parse_int(chapter),
"title": title, "lang": "en", "language": "English",
}))
@ -79,9 +79,9 @@ class HentaihereChapterExtractor(ChapterExtractor):
match = re.match(pattern, title)
return {
"manga": match.group(1),
"manga_id": util.safe_int(self.manga_id),
"chapter": util.safe_int(self.chapter),
"chapter_id": util.safe_int(chapter_id),
"manga_id": text.parse_int(self.manga_id),
"chapter": text.parse_int(self.chapter),
"chapter_id": text.parse_int(chapter_id),
"type": match.group(2),
"title": match.group(3),
"author": match.group(4),

@ -30,7 +30,7 @@ class HitomiGalleryExtractor(ChapterExtractor):
]
def __init__(self, match):
self.gid = util.safe_int(match.group(1))
self.gid = text.parse_int(match.group(1))
url = "https://hitomi.la/galleries/{}.html".format(self.gid)
ChapterExtractor.__init__(self, url)

@ -9,7 +9,7 @@
"""Extract images from http://imagefap.com/"""
from .common import Extractor, Message
from .. import text, util
from .. import text
import json
@ -159,7 +159,7 @@ class ImagefapUserExtractor(ImagefapExtractor):
yield Message.Version, 1
for gid, name in self.get_gallery_data():
url = "http://www.imagefap.com/gallery/" + gid
data = {"gallery_id": util.safe_int(gid), "title": name}
data = {"gallery_id": text.parse_int(gid), "title": name}
yield Message.Queue, url, data
def get_gallery_data(self):

@ -12,7 +12,6 @@ from .common import Extractor, Message
from .. import text, exception
from ..cache import memcache
from os.path import splitext
from urllib.parse import urljoin
class ImagehostImageExtractor(Extractor):
@ -142,8 +141,7 @@ class ImagevenueImageExtractor(ImagehostImageExtractor):
def get_info(self, page):
url = text.extract(page, 'SRC="', '"')[0]
url = urljoin(self.url, url)
return url, url
return text.urljoin(self.url, url), url
class ImagetwistImageExtractor(ImagehostImageExtractor):

@ -10,7 +10,6 @@
from .common import AsynchronousExtractor, Message
from .. import text, exception
from urllib.parse import urljoin
class KhinsiderSoundtrackExtractor(AsynchronousExtractor):
@ -63,7 +62,8 @@ class KhinsiderSoundtrackExtractor(AsynchronousExtractor):
page = text.extract(page, '<table id="songlist">', '</table>')[0]
for num, url in enumerate(text.extract_iter(
page, '<td class="clickable-row"><a href="', '"'), 1):
page = self.request(urljoin(self.root, url), encoding="utf-8").text
url = text.urljoin(self.root, url)
page = self.request(url, encoding="utf-8").text
url = text.extract(
page, '<p><a style="color: #21363f;" href="', '"')[0]
yield url, text.nameext_from_url(url, {"num": num})

@ -9,7 +9,7 @@
"""Extract manga-chapters and entire manga from http://kissmanga.com/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util, cloudflare, aes, exception
from .. import text, cloudflare, aes, exception
from ..cache import cache
import re
import hashlib
@ -56,8 +56,8 @@ class KissmangaBase():
), data["chapter_string"])
volume, chapter, minor, title = match.groups()
data["volume"] = util.safe_int(volume)
data["chapter"] = util.safe_int(chapter)
data["volume"] = text.parse_int(volume)
data["chapter"] = text.parse_int(chapter)
data["chapter_minor"] = "." + minor if minor else ""
data["title"] = title if title and title != "Read Online" else ""
return data
@ -89,7 +89,7 @@ class KissmangaMangaExtractor(KissmangaBase, MangaExtractor):
url, _, chapter = item.partition(needle)
data = {
"manga": manga, "chapter_string": chapter,
"chapter_id": util.safe_int(url.rpartition("=")[2]),
"chapter_id": text.parse_int(url.rpartition("=")[2]),
"lang": "en", "language": "English",
}
self.parse_chapter_string(data)
@ -128,7 +128,7 @@ class KissmangaChapterExtractor(KissmangaBase, ChapterExtractor):
data = {
"manga": manga.strip(),
"chapter_string": cinfo.strip(),
"chapter_id": util.safe_int(self.chapter_id),
"chapter_id": text.parse_int(self.chapter_id),
"lang": "en",
"language": "English",
}

@ -9,7 +9,7 @@
"""Extract manga-chapters and entire manga from https://komikcast.com/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util, cloudflare
from .. import text, cloudflare
import re
@ -39,7 +39,7 @@ class KomikcastBase():
data["title"] = title.strip()
else:
data["title"] = ""
data["chapter"] = util.safe_int(chapter)
data["chapter"] = text.parse_int(chapter)
data["lang"] = "id"
data["language"] = "Indonesian"
@ -75,8 +75,8 @@ class KomikcastChapterExtractor(KomikcastBase, ChapterExtractor):
page, '<div id="readerarea">', '<div class="navig">')[0]
return [
(url, {
"width": util.safe_int(width),
"height": util.safe_int(height),
"width": text.parse_int(width),
"height": text.parse_int(height),
})
for url, width, height in re.findall(
r"<img[^>]*? src=[\"']([^\"']+)[\"']"

@ -10,7 +10,6 @@
from .common import ChapterExtractor, MangaExtractor
from .. import text, util, exception
from urllib.parse import urljoin
import json
import re
@ -69,8 +68,8 @@ class MangadexChapterExtractor(MangadexExtractor, ChapterExtractor):
"manga": data["manga_title"],
"manga_id": data["manga_id"],
"title": data["chapter_title"],
"volume": util.safe_int(match.group(1)),
"chapter": util.safe_int(match.group(2)),
"volume": text.parse_int(match.group(1)),
"chapter": text.parse_int(match.group(2)),
"chapter_minor": match.group(3) or "",
"chapter_id": data["chapter_id"],
"chapter_string": info.replace(" - MangaDex", ""),
@ -84,7 +83,7 @@ class MangadexChapterExtractor(MangadexExtractor, ChapterExtractor):
pagelist, pos = text.extract(page, "var page_array = [", "]", pos)
server , pos = text.extract(page, "var server = '", "'", pos)
base = urljoin(self.root, server + dataurl + "/")
base = text.urljoin(self.root, server + dataurl + "/")
return [
(base + page, None)
@ -130,7 +129,7 @@ class MangadexMangaExtractor(MangadexExtractor, MangaExtractor):
manga = text.unescape(extr(
page, '"og:title" content="', '"')[0].rpartition(" (")[0])
manga_id = util.safe_int(extr(
manga_id = text.parse_int(extr(
page, '/images/manga/', '.')[0])
while True:
@ -151,15 +150,15 @@ class MangadexMangaExtractor(MangadexExtractor, MangaExtractor):
results.append((self.root + "/chapter/" + chid, {
"manga": manga,
"manga_id": util.safe_int(manga_id),
"manga_id": text.parse_int(manga_id),
"title": text.unescape(title),
"volume": util.safe_int(volume),
"chapter": util.safe_int(chapter),
"volume": text.parse_int(volume),
"chapter": text.parse_int(chapter),
"chapter_minor": sep + minor,
"chapter_id": util.safe_int(chid),
"chapter_id": text.parse_int(chid),
"group": text.unescape(text.remove_html(group)),
"contributor": text.remove_html(user),
"views": util.safe_int(views),
"views": text.parse_int(views),
"date": date,
"lang": util.language_to_code(language),
"language": language,

@ -9,7 +9,7 @@
"""Extract manga-chapters and entire manga from http://fanfox.net/"""
from .common import ChapterExtractor
from .. import text, util, exception
from .. import text, exception
import re
@ -47,7 +47,7 @@ class MangafoxChapterExtractor(ChapterExtractor):
data["chapter_minor"] = match.group(4) or ""
data["manga"] = data["manga"].rpartition(" ")[0]
for key in ("sid", "cid", "count", "volume", "chapter"):
data[key] = util.safe_int(data[key])
data[key] = text.parse_int(data[key])
return data
def get_images(self, page):

@ -9,9 +9,8 @@
"""Extract manga-chapters and entire manga from http://www.mangahere.co/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util
from .. import text
from ..cache import memcache
from urllib.parse import urljoin
import re
@ -58,10 +57,10 @@ class MangahereMangaExtractor(MangahereBase, MangaExtractor):
volume, pos = text.extract(page, 'span class="mr6">', '<', pos)
title, pos = text.extract(page, '/span>', '<', pos)
date, pos = text.extract(page, 'class="right">', '</span>', pos)
results.append((urljoin("http:", url), {
results.append((text.urljoin("http:", url), {
"manga": manga, "title": title, "date": date,
"volume": util.safe_int(volume.rpartition(" ")[2]),
"chapter": util.safe_int(chapter),
"volume": text.parse_int(volume.rpartition(" ")[2]),
"chapter": text.parse_int(chapter),
"chapter_minor": dot + minor,
"lang": "en", "language": "English",
}))
@ -98,12 +97,12 @@ class MangahereChapterExtractor(MangahereBase, ChapterExtractor):
return {
"manga": text.unescape(manga),
"manga_id": util.safe_int(mid),
"manga_id": text.parse_int(mid),
"title": self._get_title_map(mid).get(self.chapter),
"volume": util.safe_int(self.volume),
"chapter": util.safe_int(chapter),
"volume": text.parse_int(self.volume),
"chapter": text.parse_int(chapter),
"chapter_minor": dot + minor,
"count": util.safe_int(count),
"count": text.parse_int(count),
"lang": "en",
"language": "English",
}

@ -9,8 +9,7 @@
"""Extract manga-chapters and entire manga from https://mangapark.me/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util
from urllib.parse import urljoin
from .. import text
class MangaparkExtractor():
@ -25,12 +24,12 @@ class MangaparkExtractor():
for part in path.split("/")[3:]:
key, value = part[0], part[1:]
if key == "s":
data["version"] = util.safe_int(value)
data["version"] = text.parse_int(value)
elif key == "v":
data["volume"] = util.safe_int(value)
data["volume"] = text.parse_int(value)
elif key == "c":
chapter, dot, minor = value.partition(".")
data["chapter"] = util.safe_int(chapter)
data["chapter"] = text.parse_int(chapter)
data["chapter_minor"] = dot + minor
elif key == "e":
data["chapter_minor"] = "v" + value
@ -64,7 +63,7 @@ class MangaparkMangaExtractor(MangaparkExtractor, MangaExtractor):
self.parse_chapter_path(path, data)
data["title"] = title[3:].strip()
data["date"] = date
data["count"] = util.safe_int(count)
data["count"] = text.parse_int(count)
results.append((self.root + path, data.copy()))
@ -107,7 +106,7 @@ class MangaparkChapterExtractor(MangaparkExtractor, ChapterExtractor):
data["manga"], _, data["type"] = data["manga"].rpartition(" ")
data["manga"] = text.unescape(data["manga"])
data["title"] = data["title"].partition(": ")[2]
data["count"] = util.safe_int(data["count"])
data["count"] = text.parse_int(data["count"])
return data
def get_images(self, page):
@ -120,7 +119,7 @@ class MangaparkChapterExtractor(MangaparkExtractor, ChapterExtractor):
num += 1
width , pos = text.extract(page, ' width="', '"', pos)
height, pos = text.extract(page, ' _heighth="', '"', pos)
yield urljoin(self.root, url), {
yield text.urljoin(self.root, url), {
"page": num,
"width": width,
"height": height,

@ -9,7 +9,7 @@
"""Extract manga-chapters and entire manga from https://www.mangareader.net/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util
from .. import text
class MangareaderBase():
@ -53,7 +53,7 @@ class MangareaderMangaExtractor(MangareaderBase, MangaExtractor):
return results
data["title"], pos = text.extract(page, '</a> : ', '</td>', pos)
data["date"] , pos = text.extract(page, '<td>', '</td>', pos)
data["chapter"] = util.safe_int(url.rpartition("/")[2])
data["chapter"] = text.parse_int(url.rpartition("/")[2])
results.append((self.root + url, data.copy()))
@ -79,7 +79,7 @@ class MangareaderChapterExtractor(MangareaderBase, ChapterExtractor):
"""Collect metadata for extractor-job"""
page = self.request(self.root + self.url_title).text
data = self.parse_page(page, {
"chapter": util.safe_int(self.chapter),
"chapter": text.parse_int(self.chapter),
"lang": "en",
"language": "English",
})
@ -87,7 +87,7 @@ class MangareaderChapterExtractor(MangareaderBase, ChapterExtractor):
('title', ' ' + self.chapter + '</a> : ', '</td>'),
('date', '<td>', '</td>'),
), page.index('<div id="chapterlist">'), data)
data["count"] = util.safe_int(text.extract(
data["count"] = text.parse_int(text.extract(
chapter_page, '</select> of ', '<')[0]
)
return data
@ -118,6 +118,6 @@ class MangareaderChapterExtractor(MangareaderBase, ChapterExtractor):
height, pos = extr(page, ' height="', '"', pos)
image, pos = extr(page, ' src="', '"', pos)
return self.root + url, image, {
"width": util.safe_int(width),
"height": util.safe_int(height),
"width": text.parse_int(width),
"height": text.parse_int(height),
}

@ -9,8 +9,7 @@
"""Extract manga-chapters from https://mangastream.com/"""
from .common import ChapterExtractor
from .. import text, util
from urllib.parse import urljoin
from .. import text
class MangastreamChapterExtractor(ChapterExtractor):
@ -35,9 +34,9 @@ class MangastreamChapterExtractor(ChapterExtractor):
return {
"manga": manga,
"chapter": text.unquote(self.chapter),
"chapter_id": util.safe_int(self.ch_id),
"chapter_id": text.parse_int(self.ch_id),
"title": title,
"count": util.safe_int(count, 1),
"count": text.parse_int(count, 1),
"lang": "en",
"language": "English",
}
@ -47,5 +46,5 @@ class MangastreamChapterExtractor(ChapterExtractor):
pos = page.index(' class="page"')
next_url = text.extract(page, ' href="', '"', pos)[0]
image_url = text.extract(page, ' src="', '"', pos)[0]
yield urljoin(self.base_url, image_url), None
page = self.request(urljoin(self.base_url, next_url)).text
yield text.urljoin(self.base_url, image_url), None
page = self.request(text.urljoin(self.base_url, next_url)).text

@ -9,7 +9,7 @@
"""Extract images from https://nhentai.net/"""
from .common import Extractor, Message
from .. import text, util
from .. import text
class NHentaiExtractor(Extractor):
@ -95,7 +95,7 @@ class NhentaiSearchExtractor(NHentaiExtractor):
def _pagination(self, endpoint, params):
"""Pagination over API responses"""
url = "{}/api/{}".format(self.root, endpoint)
params["page"] = util.safe_int(params.get("page"), 1)
params["page"] = text.parse_int(params.get("page"), 1)
while True:
data = self.request(url, params=params, fatal=False).json()

@ -9,7 +9,7 @@
"""Extract images from https://nijie.info/"""
from .common import AsynchronousExtractor, Message
from .. import text, util, exception
from .. import text, exception
from ..cache import cache
@ -44,7 +44,7 @@ class NijieExtractor(AsynchronousExtractor):
def get_job_metadata(self):
"""Collect metadata for extractor-job"""
return {"user_id": util.safe_int(self.user_id)}
return {"user_id": text.parse_int(self.user_id)}
def get_image_ids(self):
"""Collect all relevant image-ids"""
@ -63,8 +63,8 @@ class NijieExtractor(AsynchronousExtractor):
images = list(text.extract_iter(page, '<img src="//pic', '"', pos))
title = title.rpartition("|")[0].strip()
image_id = util.safe_int(image_id)
artist_id = util.safe_int(self._userid_from_popup(page))
image_id = text.parse_int(image_id)
artist_id = text.parse_int(self._userid_from_popup(page))
for index, url in enumerate(images):
yield "https://pic" + url, text.nameext_from_url(url, {
@ -193,8 +193,8 @@ class NijieImageExtractor(NijieExtractor):
self.page = ""
def get_job_metadata(self):
response = self.request(self.popup_url + self.image_id,
allow_redirects=False, allow_empty=True)
response = self.request(
self.popup_url + self.image_id, allow_redirects=False)
if 300 <= response.status_code < 400:
raise exception.NotFoundError("image")
self.page = response.text

@ -9,8 +9,8 @@
"""Utility classes to setup OAuth and link a users account to gallery-dl"""
from .common import Extractor, Message
from . import deviantart, flickr, reddit, tumblr
from .. import text, util, config
from . import deviantart, flickr, reddit, smugmug, tumblr
from .. import text, oauth, config
import os
import urllib.parse
@ -70,21 +70,19 @@ class OAuthBase(Extractor):
def _oauth1_authorization_flow(
self, request_token_url, authorize_url, access_token_url):
"""Perform the OAuth 1.0a authorization flow"""
del self.session.params["oauth_token"]
# get a request token
params = {"oauth_callback": self.redirect_uri}
data = self.session.get(request_token_url, params=params).text
data = text.parse_query(data)
self.session.params["oauth_token"] = token = data["oauth_token"]
self.session.token_secret = data["oauth_token_secret"]
self.session.auth.token_secret = data["oauth_token_secret"]
# get the user's authorization
params = {"oauth_token": token, "perms": "read"}
params = {"oauth_token": data["oauth_token"], "perms": "read"}
data = self.open(authorize_url, params)
# exchange the request token for an access token
# self.session.token = data["oauth_token"]
data = self.session.get(access_token_url, params=data).text
data = text.parse_query(data)
@ -101,7 +99,7 @@ class OAuthBase(Extractor):
state = "gallery-dl_{}_{}".format(
self.subcategory,
util.OAuthSession.nonce(8)
oauth.nonce(8),
)
auth_params = {
@ -182,8 +180,7 @@ class OAuthFlickr(OAuthBase):
def __init__(self, match):
OAuthBase.__init__(self, match)
self.session = util.OAuthSession(
self.session,
self.session = oauth.OAuth1Session(
self.oauth_config("api-key", flickr.FlickrAPI.API_KEY),
self.oauth_config("api-secret", flickr.FlickrAPI.API_SECRET),
)
@ -215,14 +212,34 @@ class OAuthReddit(OAuthBase):
)
class OAuthSmugmug(OAuthBase):
subcategory = "smugmug"
pattern = ["oauth:smugmug$"]
def __init__(self, match):
OAuthBase.__init__(self, match)
self.session = oauth.OAuth1Session(
self.oauth_config("api-key", smugmug.SmugmugAPI.API_KEY),
self.oauth_config("api-secret", smugmug.SmugmugAPI.API_SECRET),
)
def items(self):
yield Message.Version, 1
self._oauth1_authorization_flow(
"https://api.smugmug.com/services/oauth/1.0a/getRequestToken",
"https://api.smugmug.com/services/oauth/1.0a/authorize",
"https://api.smugmug.com/services/oauth/1.0a/getAccessToken",
)
class OAuthTumblr(OAuthBase):
subcategory = "tumblr"
pattern = ["oauth:tumblr$"]
def __init__(self, match):
OAuthBase.__init__(self, match)
self.session = util.OAuthSession(
self.session,
self.session = oauth.OAuth1Session(
self.oauth_config("api-key", tumblr.TumblrAPI.API_KEY),
self.oauth_config("api-secret", tumblr.TumblrAPI.API_SECRET),
)

@ -9,7 +9,7 @@
"""Extract images from http://rule34.paheal.net/"""
from .common import SharedConfigExtractor, Message
from .. import text, util
from .. import text
class PahealExtractor(SharedConfigExtractor):
@ -27,7 +27,7 @@ class PahealExtractor(SharedConfigExtractor):
for data in self.get_posts():
url = data["file_url"]
for key in ("id", "width", "height"):
data[key] = util.safe_int(data[key])
data[key] = text.parse_int(data[key])
data["tags"] = text.unquote(data["tags"])
yield Message.Url, url, text.nameext_from_url(url, data)
@ -85,7 +85,7 @@ class PahealTagExtractor(PahealExtractor):
return {
"id": pid, "md5": md5, "tags": tags, "file_url": url,
"width": width, "height": height,
"size": util.parse_bytes(size[:-1]),
"size": text.parse_bytes(size[:-1]),
}

@ -9,9 +9,9 @@
"""Extract images and ugoira from https://www.pixiv.net/"""
from .common import Extractor, Message
from .. import text, exception
from .. import text, util, exception
from ..cache import cache
import re
from datetime import datetime, timedelta
class PixivExtractor(Extractor):
@ -20,11 +20,10 @@ class PixivExtractor(Extractor):
directory_fmt = ["{category}", "{user[id]} {user[account]}"]
filename_fmt = "{category}_{user[id]}_{id}{num}.{extension}"
archive_fmt = "{id}{num}.{extension}"
illust_url = "https://www.pixiv.net/member_illust.php?mode=medium"
def __init__(self):
Extractor.__init__(self)
self.api = PixivAPI(self)
self.api = PixivAppAPI(self)
self.user_id = -1
self.load_ugoira = self.config("ugoira", True)
@ -35,70 +34,54 @@ class PixivExtractor(Extractor):
yield Message.Directory, metadata
for work in self.works():
work = self.prepare_work(work)
if not work["user"]["id"]:
continue
meta_single_page = work["meta_single_page"]
meta_pages = work["meta_pages"]
del work["meta_single_page"]
del work["image_urls"]
del work["meta_pages"]
work["num"] = ""
work["tags"] = [tag["name"] for tag in work["tags"]]
work.update(metadata)
if work["type"] == "ugoira":
if not self.load_ugoira:
continue
url, framelist = self.parse_ugoira(work)
ugoira = self.api.ugoira_metadata(work["id"])
url = ugoira["zip_urls"]["medium"].replace(
"_ugoira600x600", "_ugoira1920x1080")
work["extension"] = "zip"
yield Message.Url, url, work
framelist = "".join(
"{file} {delay}\n".format_map(frame)
for frame in ugoira["frames"]
)
work["extension"] = "txt"
yield Message.Url, "text:"+framelist, work
yield Message.Url, "text:" + framelist, work
elif work["page_count"] == 1:
yield Message.Url, work["url"], work
url = meta_single_page["original_image_url"]
work["extension"] = url.rpartition(".")[2]
yield Message.Url, url, work
else:
url, _, ext = work["url"].rpartition("_p0")
for i in range(work["page_count"]):
work["num"] = "_p{:02}".format(i)
yield Message.Url, "{}_p{}{}".format(url, i, ext), work
for num, img in enumerate(meta_pages):
url = img["image_urls"]["original"]
work["num"] = "_p{:02}".format(num)
work["extension"] = url.rpartition(".")[2]
yield Message.Url, url, work
def works(self):
"""Return an iterable containing all relevant 'work'-objects"""
return []
def prepare_work(self, work):
"""Prepare a work-dictionary with additional keywords"""
url = work["image_urls"]["large"]
del work["image_urls"]
work["num"] = ""
work["url"] = url
work["extension"] = url.rpartition(".")[2]
return work
def parse_ugoira(self, data):
"""Parse ugoira data"""
# get illust page
page = self.request(
self.illust_url,
params={"illust_id": data["id"]},
headers={"User-Agent": "Mozilla/5.0"},
).text
# parse page
frames = text.extract(page, ',"frames":[', ']')[0]
# build url
url = re.sub(
r"/img-original/(.+/\d+)[^/]+",
r"/img-zip-ugoira/\g<1>_ugoira1920x1080.zip",
data["url"]
)
# build framelist
framelist = re.sub(
r'\{"file":"([^"]+)","delay":(\d+)\},?',
r'\1 \2\n', frames
)
return url, framelist
def get_metadata(self, user=None):
"""Collect metadata for extractor-job"""
if not user:
user = self.api.user(self.user_id)[0]
user = self.api.user_detail(self.user_id)
return {"user": user}
@ -106,17 +89,23 @@ class PixivUserExtractor(PixivExtractor):
"""Extractor for works of a pixiv-user"""
subcategory = "user"
pattern = [(r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net"
r"/member(?:_illust)?\.php\?id=(\d+)(?:.*&tag=([^&#]+))?"),
r"/member(?:_illust)?\.php\?id=(\d+)(?:&([^#]+))?"),
(r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net"
r"/(?:u(?:ser)?/|(?:mypage\.php)?#id=)(\d+)()")]
test = [
("http://www.pixiv.net/member_illust.php?id=173530", {
"url": "852c31ad83b6840bacbce824d85f2a997889efb7",
}),
# illusts with specific tag
(("https://www.pixiv.net/member_illust.php?id=173530"
"&tag=%E6%89%8B%E3%81%B6%E3%82%8D"), {
"url": "25b1cd81153a8ff82eec440dd9f20a4a22079658",
}),
# all sorts of query parameters
(("https://www.pixiv.net/member_illust.php?id=3137110"
"&tag=%E3%83%96%E3%82%A4%E3%82%BA&type=illust&p=2"), {
"count": ">= 55",
}),
("http://www.pixiv.net/member_illust.php?id=173531", {
"exception": exception.NotFoundError,
}),
@ -129,18 +118,32 @@ class PixivUserExtractor(PixivExtractor):
def __init__(self, match):
PixivExtractor.__init__(self)
self.user_id, tag = match.groups()
if tag:
self.tag = text.unquote(tag).lower()
self.works = self._tagged_works
self.user_id, self.query = match.groups()
def works(self):
return self.api.user_works(self.user_id)
works = self.api.user_illusts(self.user_id)
def _tagged_works(self):
for work in self.api.user_works(self.user_id):
if self.tag in [tag.lower() for tag in work["tags"]]:
yield work
if self.query:
qdict = text.parse_query(self.query)
if "type" in qdict:
type_ = qdict["type"].lower()
works = filter(self._is_type(type_), works)
if "tag" in qdict:
tag = text.unquote(qdict["tag"]).lower()
works = filter(self._has_tag(tag), works)
if "p" in qdict: # apply page-offset last
offset = (text.parse_int(qdict["p"], 1) - 1) * 20
works = util.advance(works, offset)
return works
@staticmethod
def _has_tag(tag):
return lambda work: tag in [t["name"].lower() for t in work["tags"]]
@staticmethod
def _is_type(type_):
return lambda work: work["type"] == type_
class PixivMeExtractor(PixivExtractor):
@ -188,14 +191,6 @@ class PixivWorkExtractor(PixivExtractor):
"?mode=medium&illust_id=966411"), {
"exception": exception.NotFoundError,
}),
(("http://i1.pixiv.net/c/600x600/img-master/"
"img/2008/06/13/00/29/13/966412_p0_master1200.jpg"), {
"url": "90c1715b07b0d1aad300bce256a0bc71f42540ba",
}),
(("https://i.pximg.net/img-original/"
"img/2017/04/25/07/33/29/62568267_p0.png"), {
"url": "71b8bbd070d6b03a75ca4afb89f64d1445b2278d",
}),
# ugoira
(("https://www.pixiv.net/member_illust.php"
"?mode=medium&illust_id=66806629"), {
@ -203,6 +198,10 @@ class PixivWorkExtractor(PixivExtractor):
r"66806629_ugoira1920x1080\.zip|text:.+"),
"count": 2,
}),
(("http://i1.pixiv.net/c/600x600/img-master/"
"img/2008/06/13/00/29/13/966412_p0_master1200.jpg"), None),
(("https://i.pximg.net/img-original/"
"img/2017/04/25/07/33/29/62568267_p0.png"), None),
("https://www.pixiv.net/i/966412", None),
("http://img.pixiv.net/img/soundcross/42626136.jpg", None),
("http://i2.pixiv.net/img76/img/snailrin/42672235.jpg", None),
@ -218,214 +217,318 @@ class PixivWorkExtractor(PixivExtractor):
return (self.work,)
def get_metadata(self, user=None):
self.work = self.api.work(self.illust_id)[0]
self.work = self.api.illust_detail(self.illust_id)
return PixivExtractor.get_metadata(self, self.work["user"])
class PixivFavoriteExtractor(PixivExtractor):
"""Extractor for all favorites/bookmarks of a pixiv-user"""
subcategory = "favorite"
directory_fmt = ["{category}", "bookmarks", "{user[id]} {user[account]}"]
archive_fmt = "f_{bookmark[id]}{num}.{extension}"
directory_fmt = ["{category}", "bookmarks",
"{user_bookmark[id]} {user_bookmark[account]}"]
archive_fmt = "f_{user_bookmark[id]}_{id}{num}.{extension}"
pattern = [r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net"
r"/bookmark\.php\?id=(\d+)"]
r"/bookmark\.php(?:\?([^#]*))?"]
test = [
("https://www.pixiv.net/bookmark.php?id=173530", {
"url": "e717eb511500f2fa3497aaee796a468ecf685cc4",
}),
# bookmarks with specific tag
(("https://www.pixiv.net/bookmark.php?id=3137110"
"&tag=%E3%81%AF%E3%82%93%E3%82%82%E3%82%93&p=1"), {
"count": 2,
}),
# own bookmarks
("https://www.pixiv.net/bookmark.php", {
"url": "90c1715b07b0d1aad300bce256a0bc71f42540ba",
}),
# touch URLs
("https://touch.pixiv.net/bookmark.php?id=173530", None),
("https://touch.pixiv.net/bookmark.php", None),
]
def __init__(self, match):
PixivExtractor.__init__(self)
self.user_id = match.group(1)
self.query = text.parse_query(match.group(1))
if "id" not in self.query:
self.subcategory = "bookmark"
def works(self):
return self.api.user_favorite_works(self.user_id)
def prepare_work(self, work):
work["work"]["bookmark"] = {
key: work[key]
for key in ("id", "comment", "tags", "publicity")
}
return PixivExtractor.prepare_work(self, work["work"])
tag = None
restrict = "public"
offset = 0
if "tag" in self.query:
tag = text.unquote(self.query["tag"])
if "rest" in self.query and self.query["rest"] == "hide":
restrict = "private"
if "p" in self.query:
offset = (text.parse_int(self.query["p"], 1) - 1) * 20
class PixivBookmarkExtractor(PixivFavoriteExtractor):
"""Extractor for all favorites/bookmarks of your own account"""
subcategory = "bookmark"
pattern = [r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net/bookmark\.php()$"]
test = [
("https://www.pixiv.net/bookmark.php", None),
("https://touch.pixiv.net/bookmark.php", None),
]
works = self.api.user_bookmarks_illust(self.user_id, tag, restrict)
return util.advance(works, offset)
def get_metadata(self, user=None):
self.api.login()
user = self.api.user_info
if "id" in self.query:
user = self.api.user_detail(self.query["id"])
else:
self.api.login()
user = self.api.user
self.user_id = user["id"]
return PixivExtractor.get_metadata(self, user)
return {"user_bookmark": user}
class PixivRankingExtractor(PixivExtractor):
"""Extractor for pixiv ranking pages"""
subcategory = "ranking"
archive_fmt = "r_{ranking[mode]}_{ranking[date]}_{id}{num}.{extension}"
directory_fmt = ["{category}", "rankings", "{mode}", "{date}"]
directory_fmt = ["{category}", "rankings",
"{ranking[mode]}", "{ranking[date]}"]
pattern = [r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net"
r"/ranking\.php(?:\?([^#]*))?"]
test = [
(("https://www.pixiv.net/ranking.php"
"?mode=daily&content=illust&date=20170818"), None),
("https://www.pixiv.net/ranking.php?mode=daily&date=20170818", None),
("https://www.pixiv.net/ranking.php", None),
("https://touch.pixiv.net/ranking.php", None),
]
def __init__(self, match):
PixivExtractor.__init__(self)
self.ranking_info = None
self._iter = None
self._first = None
query = text.parse_query(match.group(1))
self.mode = query.get("mode", "daily")
self.content = query.get("content", "all")
self.date = query.get("date")
if self.date:
if len(self.date) == 8 and self.date.isdecimal():
self.date = (self.date[0:4] + "-" +
self.date[4:6] + "-" +
self.date[6:8])
self.query = match.group(1)
self.mode = self.date = None
def works(self):
return self.api.illust_ranking(self.mode, self.date)
def get_metadata(self, user=None):
query = text.parse_query(self.query)
mode = query.get("mode", "daily").lower()
mode_map = {
"daily": "day",
"daily_r18": "day_r18",
"weekly": "week",
"weekly_r18": "week_r18",
"monthly": "month",
"male": "day_male",
"male_r18": "day_male_r18",
"female": "day_female",
"female_r18": "day_female_r18",
"original": "week_original",
"rookie": "week_rookie",
"r18g": "week_r18g",
}
if mode not in mode_map:
self.log.warning("invalid mode '%s'", mode)
mode = "daily"
self.mode = mode_map[mode]
date = query.get("date")
if date:
if len(date) == 8 and date.isdecimal():
date = "{}-{}-{}".format(date[0:4], date[4:6], date[6:8])
else:
self.log.warning("invalid date '%s'", self.date)
self.date = None
self.log.warning("invalid date '%s'", date)
date = None
if not date:
date = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
self.date = date
return {"ranking": {
"mode": mode,
"date": self.date,
}}
class PixivSearchExtractor(PixivExtractor):
"""Extractor for pixiv search results"""
subcategory = "search"
archive_fmt = "s_{search[word]}_{id}{num}.{extension}"
directory_fmt = ["{category}", "search", "{search[word]}"]
pattern = [r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net"
r"/search\.php\?([^#]+)"]
test = [
("https://www.pixiv.net/search.php?s_mode=s_tag&word=Original", None),
("https://touch.pixiv.net/search.php?word=Original", None),
]
if self.content not in ("all", "illust", "manga", "ugoira"):
self.log.warning("unrecognized content value '%s' - "
"falling back to 'all'", self.content)
self.content = "all"
def __init__(self, match):
PixivExtractor.__init__(self)
self.query = match.group(1)
self.word = self.sort = self.target = None
def works(self):
yield from self._first["works"]
for page in self._iter:
yield from page["works"]
return self.api.search_illust(self.word, self.sort, self.target)
def get_metadata(self, user=None):
self._iter = self.api.ranking(self.mode, self.content, self.date)
self._first = next(self._iter)
self.ranking_info = {
key: self._first[key]
for key in ("mode", "content", "date")
query = text.parse_query(self.query)
if "word" in query:
self.word = text.unescape(query["word"])
else:
self.log.error("missing search term")
raise exception.StopExtraction()
sort = query.get("order", "date_d")
sort_map = {
"date": "date_asc",
"date_d": "date_desc",
}
return self.ranking_info.copy()
if sort not in sort_map:
self.log.warning("invalid sort order '%s'", sort)
sort = "date_d"
self.sort = sort_map[sort]
target = query.get("s_mode", "s_tag")
target_map = {
"s_tag": "partial_match_for_tags",
"s_tag_full": "exact_match_for_tags",
"s_tc": "title_and_caption",
}
if target not in target_map:
self.log.warning("invalid search target '%s'", target)
target = "s_tag"
self.target = target_map[target]
return {"search": {
"word": self.word,
"sort": self.sort,
"target": self.target,
}}
class PixivFollowExtractor(PixivExtractor):
"""Extractor for new illustrations from your followed artists"""
subcategory = "follow"
archive_fmt = "F_{user_follow[id]}_{id}{num}.{extension}"
directory_fmt = ["{category}", "following"]
pattern = [r"(?:https?://)?(?:www\.|touch\.)?pixiv\.net"
r"/bookmark_new_illust\.php"]
test = [
("https://www.pixiv.net/bookmark_new_illust.php", None),
("https://touch.pixiv.net/bookmark_new_illust.php", None),
]
def prepare_work(self, work):
work["work"]["rank"] = work["rank"]
work["work"]["ranking"] = self.ranking_info
return PixivExtractor.prepare_work(self, work["work"])
def __init__(self, _):
PixivExtractor.__init__(self)
def works(self):
return self.api.illust_follow()
def get_metadata(self, user=None):
self.api.login()
return {"user_follow": self.api.user}
class PixivAPI():
"""Minimal interface for the Pixiv Public-API for mobile devices
class PixivAppAPI():
"""Minimal interface for the Pixiv App API for mobile devices
For a better and more complete implementation, see
For a more complete implementation or documentation, see
- https://github.com/upbit/pixivpy
For in-depth information regarding the Pixiv Public-API, see
- http://blog.imaou.com/opensource/2014/10/09/pixiv_api_for_ios_update.html
- https://gist.github.com/ZipFile/e14ff1a7e6d01456188a
- https://gist.github.com/ZipFile/3ba99b47162c23f8aea5d5942bb557b1
"""
CLIENT_ID = "MOBrBDS8blbauoSck0ZfDbtuzpyT"
CLIENT_SECRET = "lsACyCD94FhDUtGTXi3QzcFE2uU1hqtDaKeqrdwj"
def __init__(self, extractor):
self.session = extractor.session
self.log = extractor.log
self.username, self.password = extractor._get_auth_info()
self.user_info = None
self.user = None
self.client_id = extractor.config(
"client-id", self.CLIENT_ID)
self.client_secret = extractor.config(
"client-secret", self.CLIENT_SECRET)
self.session.headers.update({
"Referer": "https://www.pixiv.net/",
'App-OS': 'ios',
'App-OS-Version': '10.3.1',
'App-Version': '6.7.1',
'User-Agent': 'PixivIOSApp/6.7.1 (iOS 10.3.1; iPhone8,1)',
"App-OS": "ios",
"App-OS-Version": "10.3.1",
"App-Version": "6.7.1",
"User-Agent": "PixivIOSApp/6.7.1 (iOS 10.3.1; iPhone8,1)",
"Referer": "https://app-api.pixiv.net/",
})
def user(self, user_id):
"""Query information about a pixiv user"""
endpoint = "users/" + user_id
return self._call(endpoint, {})["response"]
def work(self, illust_id):
"""Query information about a single pixiv work/illustration"""
endpoint = "works/" + illust_id
params = {"image_sizes": "large"}
return self._call(endpoint, params)["response"]
def user_works(self, user_id):
"""Query information about the works of a pixiv user"""
endpoint = "users/{user}/works".format(user=user_id)
params = {"image_sizes": "large"}
return self._pagination(endpoint, params)
def user_favorite_works(self, user_id):
"""Query information about the favorite works of a pixiv user"""
endpoint = "users/{user}/favorite_works".format(user=user_id)
params = {"image_sizes": "large", "include_stats": False}
return self._pagination(endpoint, params)
def ranking(self, mode, content="all", date=None):
"""Query pixiv's ranking lists"""
endpoint = "ranking/" + content
params = {"image_sizes": "large", "mode": mode, "date": date}
return self._pagination(endpoint, params)
def login(self):
"""Login and gain a Pixiv Public-API access token"""
self.user_info, access_token = self._login_impl(
"""Login and gain an access token"""
self.user, auth = self._login_impl(
self.username, self.password)
self.session.headers["Authorization"] = access_token
self.session.headers["Authorization"] = auth
@cache(maxage=50*60, keyarg=1)
@cache(maxage=3590, keyarg=1)
def _login_impl(self, username, password):
"""Actual login implementation"""
self.log.info("Logging in as %s", username)
url = "https://oauth.secure.pixiv.net/auth/token"
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "password",
"username": username,
"password": password,
"grant_type": "password",
"client_id": "bYGKuGVw91e0NMfPGp44euvGt59s",
"client_secret": "HP3RmkgAmEGro0gn1x9ioawQE8WMfvLXDz3ZqxpK",
"get_secure_url": 1,
}
response = self.session.post(
"https://oauth.secure.pixiv.net/auth/token", data=data
)
if response.status_code != 200:
response = self.session.post(url, data=data)
if response.status_code >= 400:
raise exception.AuthenticationError()
try:
response = response.json()["response"]
token = response["access_token"]
user = response["user"]
except KeyError:
raise Exception("Get token error! Response: %s" % (response))
return user, "Bearer " + token
def _call(self, endpoint, params, _empty=[None]):
url = "https://public-api.secure.pixiv.net/v1/" + endpoint + ".json"
data = response.json()["response"]
return data["user"], "Bearer " + data["access_token"]
def illust_detail(self, illust_id):
params = {"illust_id": illust_id}
return self._call("v1/illust/detail", params)["illust"]
def illust_follow(self, restrict="all"):
params = {"restrict": restrict}
return self._pagination("v2/illust/follow", params)
def illust_ranking(self, mode="day", date=None):
params = {"mode": mode, "date": date}
return self._pagination("v1/illust/ranking", params)
def search_illust(self, word, sort=None, target=None, duration=None):
params = {"word": word, "search_target": target,
"sort": sort, "duration": duration}
return self._pagination("v1/search/illust", params)
def user_bookmarks_illust(self, user_id, tag=None, restrict="public"):
params = {"user_id": user_id, "tag": tag, "restrict": restrict}
return self._pagination("v1/user/bookmarks/illust", params)
def user_detail(self, user_id):
params = {"user_id": user_id}
return self._call("v1/user/detail", params)["user"]
def user_illusts(self, user_id):
params = {"user_id": user_id}
return self._pagination("v1/user/illusts", params)
def ugoira_metadata(self, illust_id):
params = {"illust_id": illust_id}
return self._call("v1/ugoira/metadata", params)["ugoira_metadata"]
def _call(self, endpoint, params=None):
url = "https://app-api.pixiv.net/" + endpoint
self.login()
data = self.session.get(url, params=params).json()
response = self.session.get(url, params=params)
status = data.get("status")
response = data.get("response", _empty)
if status == "failure" or response == _empty:
if 200 <= response.status_code < 400:
return response.json()
if response.status_code == 404:
raise exception.NotFoundError()
return data
self.log.error("API request failed: %s", response.text)
raise exception.StopExtraction()
def _pagination(self, endpoint, params):
while True:
data = self._call(endpoint, params)
yield from data["response"]
yield from data["illusts"]
pinfo = data["pagination"]
if pinfo["current"] == pinfo["pages"]:
if not data["next_url"]:
return
params["page"] = pinfo["next"]
query = data["next_url"].rpartition("?")[2]
params = text.parse_query(query)

@ -9,7 +9,7 @@
"""Extract comic-issues and entire comics from http://readcomiconline.to/"""
from .common import ChapterExtractor, MangaExtractor
from .. import text, util, cloudflare
from .. import text, cloudflare
import re
@ -56,7 +56,7 @@ class ReadcomiconlineComicExtractor(ReadcomiconlineBase, MangaExtractor):
issue = issue[7:]
results.append((self.root + url, {
"comic": comic, "issue": issue,
"issue_id": util.safe_int(url.rpartition("=")[2]),
"issue_id": text.parse_int(url.rpartition("=")[2]),
"lang": "en", "language": "English",
}))
return results
@ -84,7 +84,7 @@ class ReadcomiconlineIssueExtractor(ReadcomiconlineBase, ChapterExtractor):
return {
"comic": comic,
"issue": match.group(1) or match.group(2),
"issue_id": util.safe_int(self.issue_id),
"issue_id": text.parse_int(self.issue_id),
"lang": "en",
"language": "English",
}

@ -82,16 +82,16 @@ class SankakuExtractor(SharedConfigExtractor):
file_url = extr(page, '<embed src="', '"', pos)[0]
return {
"id": util.safe_int(post_id),
"id": text.parse_int(post_id),
"md5": file_url.rpartition("/")[2].partition(".")[0],
"tags": tags,
"vote_average": float(vavg or 0),
"vote_count": util.safe_int(vcnt),
"vote_count": text.parse_int(vcnt),
"created_at": created,
"rating": (rating or "?")[0].lower(),
"file_url": "https:" + text.unescape(file_url),
"width": util.safe_int(width),
"height": util.safe_int(height),
"width": text.parse_int(width),
"height": text.parse_int(height),
}
def wait(self):
@ -165,8 +165,8 @@ class SankakuTagExtractor(SankakuExtractor):
SankakuExtractor.__init__(self)
query = text.parse_query(match.group(1))
self.tags = text.unquote(query.get("tags", "").replace("+", " "))
self.start_page = util.safe_int(query.get("page"), 1)
self.next = util.safe_int(query.get("next"), 0)
self.start_page = text.parse_int(query.get("page"), 1)
self.next = text.parse_int(query.get("next"), 0)
def skip(self, num):
if self.next:
@ -212,7 +212,7 @@ class SankakuTagExtractor(SankakuExtractor):
yield from ids
params["page"] = 2
params["next"] = util.safe_int(ids[-1]) - 1
params["next"] = text.parse_int(ids[-1]) - 1
class SankakuPoolExtractor(SankakuExtractor):

@ -123,11 +123,11 @@ class SeigaUserExtractor(SeigaExtractor):
return {
"user": {
"id": util.safe_int(self.user_id),
"id": text.parse_int(self.user_id),
"name": data["name"],
"message": (data["msg"] or "").strip(),
},
"count": util.safe_int(data["count"]),
"count": text.parse_int(data["count"]),
}
def get_images(self):
@ -152,7 +152,7 @@ class SeigaUserExtractor(SeigaExtractor):
("clips" , '</span>', '</li>'),
))[0]
for key in ("image_id", "views", "comments", "clips"):
data[key] = util.safe_int(data[key])
data[key] = text.parse_int(data[key])
yield data
cnt += 1
@ -188,4 +188,4 @@ class SeigaImageExtractor(SeigaExtractor):
return num
def get_images(self):
return ({}, {"image_id": util.safe_int(self.image_id)})
return ({}, {"image_id": text.parse_int(self.image_id)})

@ -9,7 +9,7 @@
"""Extract manga-chapters from from http://raw.senmanga.com/"""
from .common import Extractor, Message
from .. import text, util
from .. import text
class SenmangaChapterExtractor(Extractor):
@ -59,7 +59,7 @@ class SenmangaChapterExtractor(Extractor):
return {
"manga": text.unescape(manga),
"chapter_string": chapter.partition(" - Page ")[0],
"count": util.safe_int(count),
"count": text.parse_int(count),
"lang": "jp",
"language": "Japanese",
}

@ -9,7 +9,7 @@
"""Extract images from https://www.slideshare.net/"""
from .common import Extractor, Message
from .. import text, util
from .. import text
class SlidesharePresentationExtractor(Extractor):
@ -78,7 +78,7 @@ class SlidesharePresentationExtractor(Extractor):
"presentation": self.presentation,
"title": text.unescape(title.strip()),
"description": text.unescape(descr),
"views": util.safe_int(views.replace(",", "")),
"views": text.parse_int(views.replace(",", "")),
"published": published,
}

@ -9,7 +9,7 @@
"""Extract images from https://www.smugmug.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, oauth, exception
BASE_PATTERN = (
r"(?:smugmug:(?!album:)(?:https?://)?([^/]+)|"
@ -171,7 +171,7 @@ class SmugmugPathExtractor(SmugmugExtractor):
yield from self.album_nodes(node)
class SmugmugAPI():
class SmugmugAPI(oauth.OAuth1API):
"""Minimal interface for the smugmug API v2"""
API_DOMAIN = "api.smugmug.com"
API_KEY = "DFqxg4jf7GrtsQ5PnbNB8899zKfnDrdK"
@ -179,25 +179,6 @@ class SmugmugAPI():
"9nMFQm25ndGBzNPnwRDbRnnVBvqt4xTq")
HEADERS = {"Accept": "application/json"}
def __init__(self, extractor):
api_key = extractor.config("api-key", self.API_KEY)
api_secret = extractor.config("api-secret", self.API_SECRET)
token = extractor.config("access-token")
token_secret = extractor.config("access-token-secret")
if api_key and api_secret and token and token_secret:
self.session = util.OAuthSession(
extractor.session,
api_key, api_secret,
token, token_secret,
)
self.api_key = None
else:
self.session = extractor.session
self.api_key = api_key
self.log = extractor.log
def album(self, album_id, expands=None):
return self._expansion("album/" + album_id, expands)

@ -9,7 +9,7 @@
"""Extract images from https://www.tumblr.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, oauth, exception
from datetime import datetime, timedelta
import re
import time
@ -53,8 +53,8 @@ class TumblrExtractor(Extractor):
"""Base class for tumblr extractors"""
category = "tumblr"
directory_fmt = ["{category}", "{name}"]
filename_fmt = "{category}_{blog_name}_{id}o{offset}.{extension}"
archive_fmt = "{id}_{offset}"
filename_fmt = "{category}_{blog_name}_{id}_{num:>02}.{extension}"
archive_fmt = "{id}_{num}"
def __init__(self, match):
Extractor.__init__(self)
@ -88,7 +88,7 @@ class TumblrExtractor(Extractor):
post["reblogged"] = reblog
post["blog"] = blog
post["offset"] = 0
post["num"] = 0
if "trail" in post:
del post["trail"]
@ -149,14 +149,14 @@ class TumblrExtractor(Extractor):
@staticmethod
def _prepare(url, post):
text.nameext_from_url(url, post)
post["offset"] += 1
post["num"] += 1
post["hash"] = post["name"].partition("_")[2]
return Message.Url, url, post
@staticmethod
def _prepare_image(url, post):
text.nameext_from_url(url, post)
post["offset"] += 1
post["num"] += 1
parts = post["name"].split("_")
post["hash"] = parts[1] if parts[1] != "inline" else parts[2]
@ -238,7 +238,7 @@ class TumblrLikesExtractor(TumblrExtractor):
"""Extractor for images from a tumblr-user's liked posts"""
subcategory = "likes"
directory_fmt = ["{category}", "{name}", "likes"]
archive_fmt = "f_{blog[name]}_{id}_{offset}"
archive_fmt = "f_{blog[name]}_{id}_{num}"
pattern = [BASE_PATTERN + r"/likes"]
test = [("http://mikf123.tumblr.com/likes", {
"count": 1,
@ -248,31 +248,15 @@ class TumblrLikesExtractor(TumblrExtractor):
return self.api.likes(self.blog)
class TumblrAPI():
class TumblrAPI(oauth.OAuth1API):
"""Minimal interface for the Tumblr API v2"""
API_KEY = "O3hU2tMi5e4Qs5t3vezEi6L0qRORJ5y9oUpSGsrWu8iA3UCc3B"
API_SECRET = "sFdsK3PDdP2QpYMRAoq0oDnw0sFS24XigXmdfnaeNZpJpqAn03"
BLOG_CACHE = {}
def __init__(self, extractor):
api_key = extractor.config("api-key", self.API_KEY)
api_secret = extractor.config("api-secret", self.API_SECRET)
token = extractor.config("access-token")
token_secret = extractor.config("access-token-secret")
if api_key and api_secret and token and token_secret:
self.session = util.OAuthSession(
extractor.session,
api_key, api_secret,
token, token_secret,
)
self.api_key = None
else:
self.session = extractor.session
self.api_key = api_key
oauth.OAuth1API.__init__(self, extractor)
self.posts_type = None
self.log = extractor.log
def info(self, blog):
"""Return general information about a blog"""

@ -9,7 +9,7 @@
"""Extract images from https://www.xvideos.com/"""
from .common import Extractor, Message
from .. import text, util, exception
from .. import text, exception
import json
@ -57,7 +57,7 @@ class XvideosGalleryExtractor(XvideosExtractor):
yield Message.Version, 1
yield Message.Directory, data
for url in imgs:
data["num"] = util.safe_int(url.rsplit("_", 2)[1])
data["num"] = text.parse_int(url.rsplit("_", 2)[1])
data["extension"] = url.rpartition(".")[2]
yield Message.Url, url, data
@ -73,14 +73,14 @@ class XvideosGalleryExtractor(XvideosExtractor):
return {
"user": {
"id": util.safe_int(data["userid"]),
"id": text.parse_int(data["userid"]),
"name": self.user,
"display": data["display"],
"description": text.remove_html(data["descr"]).strip(),
},
"tags": text.unescape(data["tags"] or "").strip().split(", "),
"title": text.unescape(data["title"]),
"gallery_id": util.safe_int(self.gid),
"gallery_id": text.parse_int(self.gid),
}
@staticmethod
@ -123,7 +123,7 @@ class XvideosUserExtractor(XvideosExtractor):
del data["galleries"]["0"]
galleries = [
{"gallery_id": util.safe_int(gid),
{"gallery_id": text.parse_int(gid),
"title": text.unescape(gdata["title"]),
"count": gdata["nb_pics"]}
for gid, gdata in data["galleries"].items()

@ -17,7 +17,7 @@ from .extractor.message import Message
class Job():
"""Base class for Job-types"""
ufile = None
ulog = None
def __init__(self, url, parent=None):
self.url = url
@ -144,8 +144,8 @@ class Job():
kwdict.update(self.userkwds)
def _write_unsupported(self, url):
if self.ufile:
print(url, file=self.ufile, flush=True)
if self.ulog:
self.ulog.info(url)
class DownloadJob(Job):

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""OAuth helper functions and classes"""
import hmac
import time
import base64
import random
import string
import hashlib
import urllib.parse
import requests
import requests.auth
from . import text
def nonce(size, alphabet=string.ascii_letters):
"""Generate a nonce value with 'size' characters"""
return "".join(random.choice(alphabet) for _ in range(size))
def quote(value, quote=urllib.parse.quote):
"""Quote 'value' according to the OAuth1.0 standard"""
return quote(value, "~")
def concat(*args):
"""Concatenate 'args' as expected by OAuth1.0"""
return "&".join(quote(item) for item in args)
class OAuth1Session(requests.Session):
"""Extension to requests.Session to support OAuth 1.0"""
def __init__(self, consumer_key, consumer_secret,
token=None, token_secret=None):
requests.Session.__init__(self)
self.auth = OAuth1Client(
consumer_key, consumer_secret,
token, token_secret,
)
def rebuild_auth(self, prepared_request, response):
if "Authorization" in prepared_request.headers:
del prepared_request.headers["Authorization"]
prepared_request.prepare_auth(self.auth)
class OAuth1Client(requests.auth.AuthBase):
"""OAuth1.0a authentication"""
def __init__(self, consumer_key, consumer_secret,
token=None, token_secret=None):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.token = token
self.token_secret = token_secret
def __call__(self, request):
oauth_params = [
("oauth_consumer_key", self.consumer_key),
("oauth_nonce", nonce(16)),
("oauth_signature_method", "HMAC-SHA1"),
("oauth_timestamp", str(int(time.time()))),
("oauth_version", "1.0"),
]
if self.token:
oauth_params.append(("oauth_token", self.token))
signature = self.generate_signature(request, oauth_params)
oauth_params.append(("oauth_signature", signature))
request.headers["Authorization"] = "OAuth " + ",".join(
key + '="' + value + '"' for key, value in oauth_params)
return request
def generate_signature(self, request, params):
"""Generate 'oauth_signature' value"""
url, _, query = request.url.partition("?")
params = params.copy()
for key, value in text.parse_query(query).items():
params.append((quote(key), quote(value)))
params.sort()
query = "&".join("=".join(item) for item in params)
message = concat(request.method, url, query).encode()
key = concat(self.consumer_secret, self.token_secret or "").encode()
signature = hmac.new(key, message, hashlib.sha1).digest()
return quote(base64.b64encode(signature).decode())
class OAuth1API():
"""Base class for OAuth1.0 based API interfaces"""
API_KEY = None
API_SECRET = None
def __init__(self, extractor):
self.log = extractor.log
api_key = extractor.config("api-key", self.API_KEY)
api_secret = extractor.config("api-secret", self.API_SECRET)
token = extractor.config("access-token")
token_secret = extractor.config("access-token-secret")
if api_key and api_secret and token and token_secret:
self.log.debug("Using OAuth1.0 authentication")
self.session = OAuth1Session(
api_key, api_secret, token, token_secret)
self.api_key = None
else:
self.log.debug("Using api_key authentication")
self.session = extractor.session
self.api_key = api_key

@ -8,37 +8,47 @@
"""Collection of functions that work in strings/text"""
import sys
import re
import os.path
import html
import os.path
import urllib.parse
INVALID_XML_CHARS = (1, 2, 3, 4, 5, 6, 7, 8, 11, 12, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31)
INVALID_XML_CHARS = (
"\x00", "\x01", "\x02", "\x03", "\x04", "\x05", "\x06", "\x07",
"\x08", "\x0b", "\x0c", "\x0e", "\x0f", "\x10", "\x11", "\x12",
"\x13", "\x14", "\x15", "\x16", "\x17", "\x18", "\x19", "\x1a",
"\x1b", "\x1c", "\x1d", "\x1e", "\x1f",
)
def clean_xml(xmldata, repl=""):
"""Replace/Remove invalid control characters in XML data"""
"""Replace/Remove invalid control characters in 'xmldata'"""
if not isinstance(xmldata, str):
try:
xmldata = "".join(xmldata)
except TypeError:
return ""
for char in INVALID_XML_CHARS:
char = chr(char)
if char in xmldata:
xmldata = xmldata.replace(char, repl)
return xmldata
def remove_html(text):
def remove_html(txt):
"""Remove html-tags from a string"""
return " ".join(re.sub("<[^>]+?>", " ", text).split())
try:
return " ".join(re.sub("<[^>]+>", " ", txt).split())
except TypeError:
return ""
def filename_from_url(url):
"""Extract the last part of an url to use as a filename"""
try:
return urllib.parse.urlsplit(url).path.rpartition("/")[2]
except ValueError:
return url
except (TypeError, AttributeError):
return ""
def nameext_from_url(url, data=None):
@ -56,7 +66,7 @@ def clean_path_windows(path):
try:
return re.sub(r'[<>:"\\/|?*]', "_", path)
except TypeError:
return path
return ""
def clean_path_posix(path):
@ -64,20 +74,7 @@ def clean_path_posix(path):
try:
return path.replace("/", "_")
except AttributeError:
return path
def shorten_path(path, limit=255, encoding=sys.getfilesystemencoding()):
"""Shorten a path segment to at most 'limit' bytes"""
return (path.encode(encoding)[:limit]).decode(encoding, "ignore")
def shorten_filename(fname, limit=255, encoding=sys.getfilesystemencoding()):
"""Shorten filename to at most 'limit' bytes while preserving extension"""
name, extension = os.path.splitext(fname)
bext = extension.encode(encoding)
bname = name.encode(encoding)[:limit-len(bext)]
return bname.decode(encoding, "ignore") + extension
return ""
def extract(txt, begin, end, pos=0):
@ -104,7 +101,7 @@ def extract(txt, begin, end, pos=0):
first = txt.index(begin, pos) + len(begin)
last = txt.index(end, first)
return txt[first:last], last+len(end)
except ValueError:
except (ValueError, TypeError, AttributeError):
return None, pos
@ -128,12 +125,44 @@ def extract_iter(txt, begin, end, pos=0):
yield value
def parse_bytes(value, default=0, suffixes="bkmgtp"):
"""Convert a bytes-amount ("500k", "2.5M", ...) to int"""
try:
last = value[-1].lower()
except (TypeError, KeyError, IndexError):
return default
if last in suffixes:
mul = 1024 ** suffixes.index(last)
value = value[:-1]
else:
mul = 1
try:
return round(float(value) * mul)
except ValueError:
return default
def parse_int(value, default=0):
"""Convert 'value' to int"""
if not value:
return default
try:
return int(value)
except (ValueError, TypeError):
return default
def parse_query(qs):
"""Parse a query string into key-value pairs"""
result = {}
for key, value in urllib.parse.parse_qsl(qs):
if key not in result:
result[key] = value
try:
for key, value in urllib.parse.parse_qsl(qs):
if key not in result:
result[key] = value
except AttributeError:
pass
return result
@ -142,6 +171,7 @@ if os.name == "nt":
else:
clean_path = clean_path_posix
urljoin = urllib.parse.urljoin
unquote = urllib.parse.unquote
escape = html.escape

@ -11,14 +11,9 @@
import re
import os
import sys
import hmac
import time
import base64
import random
import shutil
import string
import _string
import hashlib
import sqlite3
import datetime
import itertools
@ -95,22 +90,6 @@ def bdecode(data, alphabet="0123456789"):
return num
def parse_bytes(value, suffixes="bkmgtp"):
"""Convert a bytes-amount ("500k", "2.5M", ...) to int"""
last = value[-1].lower()
if last in suffixes:
mul = 1024 ** suffixes.index(last)
value = value[:-1]
else:
mul = 1
try:
return round(float(value) * mul)
except ValueError:
return 0
def advance(iterable, num):
""""Advance the iterable by 'num' steps"""
iterator = iter(iterable)
@ -135,16 +114,6 @@ def combine_dict(a, b):
return a
def safe_int(value, default=0):
"""Safely convert value to integer"""
if value is None or value == "":
return default
try:
return int(value)
except (ValueError, TypeError):
return default
def expand_path(path):
"""Expand environment variables and tildes (~)"""
if not path:
@ -253,7 +222,7 @@ class UniquePredicate():
class FilterPredicate():
"""Predicate; True if evaluating the given expression returns True"""
globalsdict = {
"safe_int": safe_int,
"parse_int": text.parse_int,
"urlsplit": urllib.parse.urlsplit,
"datetime": datetime.datetime,
"abort": raises(exception.StopExtraction()),
@ -523,54 +492,6 @@ class PathFormat():
return "\\\\?\\" + os.path.abspath(path) if os.name == "nt" else path
class OAuthSession():
"""Minimal wrapper for requests.session objects to support OAuth 1.0"""
def __init__(self, session, consumer_key, consumer_secret,
token=None, token_secret=None):
self.session = session
self.consumer_secret = consumer_secret
self.token_secret = token_secret or ""
self.params = {}
self.params["oauth_consumer_key"] = consumer_key
self.params["oauth_token"] = token
self.params["oauth_signature_method"] = "HMAC-SHA1"
self.params["oauth_version"] = "1.0"
def get(self, url, params, **kwargs):
params.update(self.params)
params["oauth_nonce"] = self.nonce(16)
params["oauth_timestamp"] = int(time.time())
return self.session.get(url + self.sign(url, params), **kwargs)
def sign(self, url, params):
"""Generate 'oauth_signature' value and return query string"""
query = self.urlencode(params)
message = self.concat("GET", url, query).encode()
key = self.concat(self.consumer_secret, self.token_secret).encode()
signature = hmac.new(key, message, hashlib.sha1).digest()
return "?{}&oauth_signature={}".format(
query, self.quote(base64.b64encode(signature).decode()))
@staticmethod
def concat(*args):
return "&".join(OAuthSession.quote(item) for item in args)
@staticmethod
def nonce(N, alphabet=string.ascii_letters):
return "".join(random.choice(alphabet) for _ in range(N))
@staticmethod
def quote(value, quote=urllib.parse.quote):
return quote(value, "~")
@staticmethod
def urlencode(params):
return "&".join(
OAuthSession.quote(str(key)) + "=" + OAuthSession.quote(str(value))
for key, value in sorted(params.items()) if value
)
class DownloadArchive():
def __init__(self, path, extractor):

@ -6,4 +6,4 @@
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
__version__ = "1.3.6-dev"
__version__ = "1.4.0-dev"

@ -87,6 +87,7 @@ AUTH_MAP = {
"reddit" : "Optional (OAuth)",
"sankaku" : "Optional",
"seiga" : "Required",
"smugmug" : "Optional (OAuth)",
"tumblr" : "Optional (OAuth)",
}

@ -8,10 +8,8 @@
# published by the Free Software Foundation.
import unittest
import requests
from gallery_dl import text
from gallery_dl.util import OAuthSession
from gallery_dl import oauth, text
TESTSERVER = "http://oauthbin.com"
CONSUMER_KEY = "key"
@ -25,7 +23,7 @@ ACCESS_TOKEN_SECRET = "accesssecret"
class TestOAuthSession(unittest.TestCase):
def test_concat(self):
concat = OAuthSession.concat
concat = oauth.concat
self.assertEqual(concat(), "")
self.assertEqual(concat("str"), "str")
@ -37,18 +35,18 @@ class TestOAuthSession(unittest.TestCase):
"GET&http%3A%2F%2Fexample.org%2F&foo%3Dbar%26baz%3Da"
)
def test_nonce(self, N=16):
nonce_values = set(OAuthSession.nonce(N) for _ in range(N))
def test_nonce(self, size=16):
nonce_values = set(oauth.nonce(size) for _ in range(size))
# uniqueness
self.assertEqual(len(nonce_values), N)
self.assertEqual(len(nonce_values), size)
# length
for nonce in nonce_values:
self.assertEqual(len(nonce), N)
self.assertEqual(len(nonce), size)
def test_quote(self):
quote = OAuthSession.quote
quote = oauth.quote
reserved = ",;:!\"§$%&/(){}[]=?`´+*'äöü"
unreserved = ("ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@ -65,33 +63,6 @@ class TestOAuthSession(unittest.TestCase):
self.assertTrue(len(quoted) >= 3)
self.assertEqual(quoted_hex.upper(), quoted_hex)
def test_urlencode(self):
urlencode = OAuthSession.urlencode
self.assertEqual(urlencode({}), "")
self.assertEqual(urlencode({"foo": "bar"}), "foo=bar")
self.assertEqual(
urlencode({"foo": "bar", "baz": "a", "a": "baz"}),
"a=baz&baz=a&foo=bar"
)
self.assertEqual(
urlencode({
"oauth_consumer_key": "0685bd9184jfhq22",
"oauth_token": "ad180jjd733klru7",
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": 137131200,
"oauth_nonce": "4572616e48616d6d65724c61686176",
"oauth_version": "1.0"
}),
"oauth_consumer_key=0685bd9184jfhq22&"
"oauth_nonce=4572616e48616d6d65724c61686176&"
"oauth_signature_method=HMAC-SHA1&"
"oauth_timestamp=137131200&"
"oauth_token=ad180jjd733klru7&"
"oauth_version=1.0"
)
def test_request_token(self):
response = self._oauth_request(
"/v1/request-token", {})
@ -113,23 +84,20 @@ class TestOAuthSession(unittest.TestCase):
self.assertTrue(data["oauth_token_secret"], ACCESS_TOKEN_SECRET)
def test_authenticated_call(self):
params = {"method": "foo", "bar": "baz", "a": "äöüß/?&#"}
params = {"method": "foo", "a": "äöüß/?&#", "äöüß/?&#": "a"}
response = self._oauth_request(
"/v1/echo", params, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
expected = OAuthSession.urlencode(params)
self.assertEqual(response, expected, msg=response)
self.assertEqual(text.parse_query(response), params)
def _oauth_request(self, endpoint, params=None,
oauth_token=None, oauth_token_secret=None):
session = OAuthSession(
requests.session(),
session = oauth.OAuth1Session(
CONSUMER_KEY, CONSUMER_SECRET,
oauth_token, oauth_token_secret,
)
url = TESTSERVER + endpoint
return session.get(url, params.copy()).text
return session.get(url, params=params).text
if __name__ == "__main__":

@ -22,7 +22,6 @@ TRAVIS_SKIP = {
# temporary issues, etc.
BROKEN = {
"pixiv", # /users/<id>/favorite_works API endpoint is gone
}

@ -1,158 +1,262 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2015 Mike Fährmann
# Copyright 2015-2018 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
import unittest
import sys
import gallery_dl.text as text
from gallery_dl import text
INVALID = ((), [], {}, None, 1, 2.3)
INVALID_ALT = ((), [], {}, None, "")
class TestText(unittest.TestCase):
def test_remove_html(self):
cases = (
"Hello World.",
" Hello World. ",
"Hello<br/>World.",
"<div><span class='a'>Hello</span><strong>World.</strong></div>"
)
def test_clean_xml(self, f=text.clean_xml):
# standard usage
self.assertEqual(f(""), "")
self.assertEqual(f("foo"), "foo")
self.assertEqual(f("\tfoo\nbar\r"), "\tfoo\nbar\r")
self.assertEqual(f("<foo>\ab\ba\fr\v</foo>"), "<foo>bar</foo>")
# 'repl' argument
repl = "#"
self.assertEqual(f("", repl), "")
self.assertEqual(f("foo", repl), "foo")
self.assertEqual(f("\tfoo\nbar\r", repl), "\tfoo\nbar\r")
self.assertEqual(
f("<foo>\ab\ba\fr\v</foo>", repl), "<foo>#b#a#r#</foo>")
# removal of all illegal control characters
value = "".join(chr(x) for x in range(32))
self.assertEqual(f(value), "\t\n\r")
# 'invalid' arguments
for value in INVALID:
self.assertEqual(f(value), "")
def test_remove_html(self, f=text.remove_html):
result = "Hello World."
for case in cases:
self.assertEqual(text.remove_html(case), result)
def test_filename_from_url(self):
cases = (
"http://example.org/v2/filename.ext",
"http://example.org/v2/filename.ext?param=value#fragment",
"example.org/filename.ext",
"/filename.ext",
"filename.ext",
)
# standard usage
self.assertEqual(f(""), "")
self.assertEqual(f("Hello World."), result)
self.assertEqual(f(" Hello World. "), result)
self.assertEqual(f("Hello<br/>World."), result)
self.assertEqual(
f("<div><b class='a'>Hello</b><i>World.</i></div>"), result)
# empty HTML
self.assertEqual(f("<div></div>"), "")
self.assertEqual(f(" <div> </div> "), "")
# malformed HTML
self.assertEqual(f("<div</div>"), "")
self.assertEqual(f("<div<Hello World.</div>"), "")
# invalid arguments
for value in INVALID:
self.assertEqual(f(value), "")
def test_filename_from_url(self, f=text.filename_from_url):
result = "filename.ext"
for case in cases:
self.assertEqual(text.filename_from_url(case), result)
def test_nameext_from_url(self):
cases = (
"http://example.org/v2/filename.ext",
"http://example.org/v2/filename.ext?param=value#fragment",
"example.org/filename.ext",
"/filename.ext",
"filename.ext",
)
result = {
"filename" : "filename.ext",
"name" : "filename",
"extension": "ext",
}
for case in cases:
self.assertEqual(text.nameext_from_url(case), result)
def test_clean_path(self):
cases = {
"Hello World." : ("Hello World.", "Hello World."),
"Hello/World/.": ("Hello_World_.", "Hello_World_."),
r'<Hello>:|"World\*?': (
'_Hello____World___', r'<Hello>:|"World\*?'
),
}
for case, result in cases.items():
self.assertEqual(text.clean_path_windows(case), result[0])
self.assertEqual(text.clean_path_posix(case), result[1])
def test_shorten_path(self):
cases = {
"dirname": "dirname",
"X"*255: "X"*255,
"X"*256: "X"*255,
"Ä"*255: "Ä"*127,
}
enc = sys.getfilesystemencoding()
for case, result in cases.items():
self.assertEqual(text.shorten_path(case), result)
self.assertTrue(len(text.shorten_path(case).encode(enc)) <= 255)
def test_shorten_filename(self):
self.maxDiff = None
cases = {
"filename.ext": "filename.ext",
"X"*251 + ".ext": "X"*251 + ".ext",
"X"*255 + ".ext": "X"*251 + ".ext",
"Ä"*251 + ".ext": "Ä"*125 + ".ext",
}
enc = sys.getfilesystemencoding()
for case, result in cases.items():
fname = text.shorten_filename(case)
self.assertEqual(fname, result)
self.assertTrue(len(fname.encode(enc)) <= 255)
def test_extract(self):
cases = {
("<a><b>", "<", ">") : ("a", 3),
("<a><b>", "X", ">") : (None, 0),
("<a><b>", "<", "X") : (None, 0),
("<a><b>", "<", ">", 3): ("b", 6),
("<a><b>", "X", ">", 3): (None, 3),
("<a><b>", "<", "X", 3): (None, 3),
}
for case, result in cases.items():
self.assertEqual(text.extract(*case), result)
def test_extract_all(self):
txt = "[c][b][a]: xyz! [d][e"
result = ({
"A": "a",
"B": "b",
"X": "xyz",
"E": "xtra",
}, 15)
self.assertEqual(text.extract_all(txt, (
(None, "[", "]"),
("B" , "[", "]"),
("A" , "[", "]"),
("X" , ": ", "!"),
), values={"E": "xtra"}), result)
def test_extract_iter(self):
# standard usage
self.assertEqual(f(""), "")
self.assertEqual(f("filename.ext"), result)
self.assertEqual(f("/filename.ext"), result)
self.assertEqual(f("example.org/filename.ext"), result)
self.assertEqual(f("http://example.org/v2/filename.ext"), result)
self.assertEqual(
f("http://example.org/v2/filename.ext?param=value#frag"), result)
# invalid arguments
for value in INVALID:
self.assertEqual(f(value), "")
def test_nameext_from_url(self, f=text.nameext_from_url):
empty = {"filename": "", "name": "", "extension": ""}
result = {"filename": "filename.ext",
"name": "filename", "extension": "ext"}
# standard usage
self.assertEqual(f(""), empty)
self.assertEqual(f("filename.ext"), result)
self.assertEqual(f("/filename.ext"), result)
self.assertEqual(f("example.org/filename.ext"), result)
self.assertEqual(f("http://example.org/v2/filename.ext"), result)
self.assertEqual(
f("http://example.org/v2/filename.ext?param=value#frag"), result)
# invalid arguments
for value in INVALID:
self.assertEqual(f(value), empty)
def test_clean_path_windows(self, f=text.clean_path_windows):
self.assertEqual(f(""), "")
self.assertEqual(f("foo"), "foo")
self.assertEqual(f("foo/bar"), "foo_bar")
self.assertEqual(f("foo<>:\"\\/|?*bar"), "foo_________bar")
# invalid arguments
for value in INVALID:
self.assertEqual(f(value), "")
def test_clean_path_posix(self, f=text.clean_path_posix):
self.assertEqual(f(""), "")
self.assertEqual(f("foo"), "foo")
self.assertEqual(f("foo/bar"), "foo_bar")
self.assertEqual(f("foo<>:\"\\/|?*bar"), "foo<>:\"\\_|?*bar")
# invalid arguments
for value in INVALID:
self.assertEqual(f(value), "")
def test_extract(self, f=text.extract):
txt = "<a><b>"
self.assertEqual(f(txt, "<", ">"), ("a", 3))
self.assertEqual(f(txt, "X", ">"), (None, 0))
self.assertEqual(f(txt, "<", "X"), (None, 0))
# 'pos' argument
for i in range(1, 4):
self.assertEqual(f(txt, "<", ">", i), ("b", 6))
for i in range(4, 10):
self.assertEqual(f(txt, "<", ">", i), (None, i))
# invalid arguments
for value in INVALID:
self.assertEqual(f(value , "<" , ">") , (None, 0))
self.assertEqual(f(txt, value, ">") , (None, 0))
self.assertEqual(f(txt, "<" , value), (None, 0))
def test_extract_all(self, f=text.extract_all):
txt = "[c][b][a]: xyz! [d][e"
result = ["c", "b", "a", "d"]
self.assertEqual(list(text.extract_iter(txt, "[", "]")), result)
def test_parse_query(self):
# standard stuff
self.assertEqual(
text.parse_query(""), {})
f(txt, ()), ({}, 0))
self.assertEqual(
f(txt, (("C", "[", "]"), ("B", "[", "]"), ("A", "[", "]"))),
({"A": "a", "B": "b", "C": "c"}, 9),
)
# 'None' as field name
self.assertEqual(
text.parse_query("foo=1"), {"foo": "1"})
f(txt, ((None, "[", "]"), (None, "[", "]"), ("A", "[", "]"))),
({"A": "a"}, 9),
)
self.assertEqual(
text.parse_query("foo=1&bar=2"), {"foo": "1", "bar": "2"})
f(txt, ((None, "[", "]"), (None, "[", "]"), (None, "[", "]"))),
({}, 9),
)
# missing value
# failed matches
self.assertEqual(
text.parse_query("bar"), {})
f(txt, (("C", "[", "]"), ("X", "X", "X"), ("B", "[", "]"))),
({"B": "b", "C": "c", "X": None}, 6),
)
# 'pos' argument
self.assertEqual(
text.parse_query("foo=1&bar"), {"foo": "1"})
f(txt, (("B", "[", "]"), ("A", "[", "]")), pos=1),
({"A": "a", "B": "b"}, 9),
)
# 'values' argument
self.assertEqual(
text.parse_query("foo=1&bar&baz=3"), {"foo": "1", "baz": "3"})
f(txt, (("C", "[", "]"),), values={"A": "a", "B": "b"}),
({"A": "a", "B": "b", "C": "c"}, 3),
)
# keys with identical names
vdict = {}
rdict, pos = f(txt, (), values=vdict)
self.assertIs(vdict, rdict)
def test_extract_iter(self, f=text.extract_iter):
txt = "[c][b][a]: xyz! [d][e"
def g(*args):
return list(f(*args))
self.assertEqual(
g("", "[", "]"), [])
self.assertEqual(
g("[a]", "[", "]"), ["a"])
self.assertEqual(
g(txt, "[", "]"), ["c", "b", "a", "d"])
self.assertEqual(
text.parse_query("foo=1&foo=2"), {"foo": "1"})
g(txt, "X", "X"), [])
self.assertEqual(
g(txt, "[", "]", 6), ["a", "d"])
def test_parse_bytes(self, f=text.parse_bytes):
self.assertEqual(f("0"), 0)
self.assertEqual(f("50"), 50)
self.assertEqual(f("50k"), 50 * 1024**1)
self.assertEqual(f("50m"), 50 * 1024**2)
self.assertEqual(f("50g"), 50 * 1024**3)
self.assertEqual(f("50t"), 50 * 1024**4)
self.assertEqual(f("50p"), 50 * 1024**5)
# fractions
self.assertEqual(f("123.456"), 123)
self.assertEqual(f("123.567"), 124)
self.assertEqual(f("0.5M"), round(0.5 * 1024**2))
# invalid arguments
for value in INVALID_ALT:
self.assertEqual(f(value), 0)
self.assertEqual(f("NaN"), 0)
self.assertEqual(f("invalid"), 0)
self.assertEqual(f(" 123 kb "), 0)
def test_parse_int(self, f=text.parse_int):
self.assertEqual(f(0), 0)
self.assertEqual(f("0"), 0)
self.assertEqual(f(123), 123)
self.assertEqual(f("123"), 123)
# invalid arguments
for value in INVALID_ALT:
self.assertEqual(f(value), 0)
self.assertEqual(f("123.456"), 0)
self.assertEqual(f("zzz"), 0)
self.assertEqual(f([1, 2, 3]), 0)
self.assertEqual(f({1: 2, 3: 4}), 0)
# 'default' argument
default = "default"
for value in INVALID_ALT:
self.assertEqual(f(value, default), default)
self.assertEqual(f("zzz", default), default)
def test_parse_query(self, f=text.parse_query):
# standard usage
self.assertEqual(f(""), {})
self.assertEqual(f("foo=1"), {"foo": "1"})
self.assertEqual(f("foo=1&bar=2"), {"foo": "1", "bar": "2"})
# missing value
self.assertEqual(f("bar"), {})
self.assertEqual(f("foo=1&bar"), {"foo": "1"})
self.assertEqual(f("foo=1&bar&baz=3"), {"foo": "1", "baz": "3"})
# keys with identical names
self.assertEqual(f("foo=1&foo=2"), {"foo": "1"})
self.assertEqual(
text.parse_query("foo=1&bar=2&foo=3&bar=4"),
f("foo=1&bar=2&foo=3&bar=4"),
{"foo": "1", "bar": "2"},
)
# non-string arguments
self.assertEqual(text.parse_query(()), {})
self.assertEqual(text.parse_query([]), {})
self.assertEqual(text.parse_query({}), {})
self.assertEqual(text.parse_query(None), {})
# invalid arguments
for value in INVALID:
self.assertEqual(f(value), {})
if __name__ == '__main__':

@ -227,22 +227,6 @@ class TestOther(unittest.TestCase):
result = util.bdecode(util.bencode(value, alphabet), alphabet)
self.assertEqual(result, value)
def test_parse_bytes(self):
self.assertEqual(util.parse_bytes("50"), 50)
self.assertEqual(util.parse_bytes("50k"), 50 * 1024**1)
self.assertEqual(util.parse_bytes("50m"), 50 * 1024**2)
self.assertEqual(util.parse_bytes("50g"), 50 * 1024**3)
self.assertEqual(util.parse_bytes("50t"), 50 * 1024**4)
self.assertEqual(util.parse_bytes("50p"), 50 * 1024**5)
self.assertEqual(util.parse_bytes("123.456"), 123)
self.assertEqual(util.parse_bytes("123.567"), 124)
self.assertEqual(util.parse_bytes("0.5M"), round(0.5 * 1024**2))
self.assertEqual(util.parse_bytes("NaN"), 0)
self.assertEqual(util.parse_bytes("invalid"), 0)
self.assertEqual(util.parse_bytes(" 123 kb "), 0)
def test_advance(self):
items = range(5)
@ -281,16 +265,6 @@ class TestOther(unittest.TestCase):
{1: {2: {3: {4: {"1": "A", "3": "C"}}}}}),
{1: {2: {3: {4: {"1": "A", "2": "b", "3": "C"}}}}})
def test_safe_int(self):
self.assertEqual(util.safe_int(123), 123)
self.assertEqual(util.safe_int("123"), 123)
self.assertEqual(util.safe_int("zzz"), 0)
self.assertEqual(util.safe_int(""), 0)
self.assertEqual(util.safe_int(None), 0)
self.assertEqual(util.safe_int("zzz", "default"), "default")
self.assertEqual(util.safe_int("", "default"), "default")
self.assertEqual(util.safe_int(None, "default"), "default")
if __name__ == '__main__':
unittest.main()

Loading…
Cancel
Save