|
|
|
@ -1,7 +1,7 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
# Copyright 2015-2022 Mike Fährmann
|
|
|
|
|
# Copyright 2015-2023 Mike Fährmann
|
|
|
|
|
#
|
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
|
|
@ -15,10 +15,12 @@ import re
|
|
|
|
|
import json
|
|
|
|
|
import hashlib
|
|
|
|
|
import datetime
|
|
|
|
|
import collections
|
|
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
from gallery_dl import \
|
|
|
|
|
extractor, util, job, config, exception, formatter # noqa E402
|
|
|
|
|
from test import results # noqa E402
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# temporary issues, etc.
|
|
|
|
@ -46,28 +48,33 @@ class TestExtractorResults(unittest.TestCase):
|
|
|
|
|
for url, exc in cls._skipped:
|
|
|
|
|
print('- {} ("{}")'.format(url, exc))
|
|
|
|
|
|
|
|
|
|
def _run_test(self, extr, url, result):
|
|
|
|
|
if result:
|
|
|
|
|
if "options" in result:
|
|
|
|
|
for key, value in result["options"]:
|
|
|
|
|
def _run_test(self, result):
|
|
|
|
|
result.pop("#comment", None)
|
|
|
|
|
only_matching = (len(result) <= 3)
|
|
|
|
|
|
|
|
|
|
if only_matching:
|
|
|
|
|
content = False
|
|
|
|
|
else:
|
|
|
|
|
if "#options" in result:
|
|
|
|
|
for key, value in result["#options"].items():
|
|
|
|
|
key = key.split(".")
|
|
|
|
|
config.set(key[:-1], key[-1], value)
|
|
|
|
|
if "range" in result:
|
|
|
|
|
config.set((), "image-range" , result["range"])
|
|
|
|
|
config.set((), "chapter-range", result["range"])
|
|
|
|
|
content = "content" in result
|
|
|
|
|
else:
|
|
|
|
|
content = False
|
|
|
|
|
if "#range" in result:
|
|
|
|
|
config.set((), "image-range" , result["#range"])
|
|
|
|
|
config.set((), "chapter-range", result["#range"])
|
|
|
|
|
content = ("#sha1_content" in result)
|
|
|
|
|
|
|
|
|
|
tjob = ResultJob(url, content=content)
|
|
|
|
|
self.assertEqual(extr, tjob.extractor.__class__)
|
|
|
|
|
tjob = ResultJob(result["#url"], content=content)
|
|
|
|
|
self.assertEqual(result["#class"], tjob.extractor.__class__)
|
|
|
|
|
|
|
|
|
|
if not result:
|
|
|
|
|
if only_matching:
|
|
|
|
|
return
|
|
|
|
|
if "exception" in result:
|
|
|
|
|
with self.assertRaises(result["exception"]):
|
|
|
|
|
|
|
|
|
|
if "#exception" in result:
|
|
|
|
|
with self.assertRaises(result["#exception"]):
|
|
|
|
|
tjob.run()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
tjob.run()
|
|
|
|
|
except exception.StopExtraction:
|
|
|
|
@ -76,11 +83,11 @@ class TestExtractorResults(unittest.TestCase):
|
|
|
|
|
exc = str(exc)
|
|
|
|
|
if re.match(r"'5\d\d ", exc) or \
|
|
|
|
|
re.search(r"\bRead timed out\b", exc):
|
|
|
|
|
self._skipped.append((url, exc))
|
|
|
|
|
self._skipped.append((result["#url"], exc))
|
|
|
|
|
self.skipTest(exc)
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
if result.get("archive", True):
|
|
|
|
|
if result.get("#archive", True):
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
len(set(tjob.archive_list)),
|
|
|
|
|
len(tjob.archive_list),
|
|
|
|
@ -92,7 +99,7 @@ class TestExtractorResults(unittest.TestCase):
|
|
|
|
|
for url, kwdict in zip(tjob.url_list, tjob.kwdict_list):
|
|
|
|
|
if "_extractor" in kwdict:
|
|
|
|
|
extr = kwdict["_extractor"].from_url(url)
|
|
|
|
|
if extr is None and not result.get("extractor", True):
|
|
|
|
|
if extr is None and not result.get("#extractor", True):
|
|
|
|
|
continue
|
|
|
|
|
self.assertIsInstance(extr, kwdict["_extractor"])
|
|
|
|
|
self.assertEqual(extr.url, url)
|
|
|
|
@ -102,27 +109,26 @@ class TestExtractorResults(unittest.TestCase):
|
|
|
|
|
self.assertIn("extension", kwdict)
|
|
|
|
|
|
|
|
|
|
# test extraction results
|
|
|
|
|
if "url" in result:
|
|
|
|
|
self.assertEqual(result["url"], tjob.url_hash.hexdigest())
|
|
|
|
|
if "#sha1_url" in result:
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
result["#sha1_url"], tjob.url_hash.hexdigest())
|
|
|
|
|
|
|
|
|
|
if "content" in result:
|
|
|
|
|
expected = result["content"]
|
|
|
|
|
if "#sha1_content" in result:
|
|
|
|
|
expected = result["#sha1_content"]
|
|
|
|
|
digest = tjob.content_hash.hexdigest()
|
|
|
|
|
if isinstance(expected, str):
|
|
|
|
|
self.assertEqual(digest, expected, "content")
|
|
|
|
|
else: # assume iterable
|
|
|
|
|
self.assertIn(digest, expected, "content")
|
|
|
|
|
|
|
|
|
|
if "keyword" in result:
|
|
|
|
|
expected = result["keyword"]
|
|
|
|
|
if isinstance(expected, dict):
|
|
|
|
|
for kwdict in tjob.kwdict_list:
|
|
|
|
|
self._test_kwdict(kwdict, expected)
|
|
|
|
|
else: # assume SHA1 hash
|
|
|
|
|
self.assertEqual(expected, tjob.kwdict_hash.hexdigest())
|
|
|
|
|
|
|
|
|
|
if "count" in result:
|
|
|
|
|
count = result["count"]
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
expected, digest, "content")
|
|
|
|
|
else: # iterable
|
|
|
|
|
self.assertIn(
|
|
|
|
|
digest, expected, "content")
|
|
|
|
|
|
|
|
|
|
if "#sha1_metadata" in result:
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
result["#sha1_metadata"], tjob.kwdict_hash.hexdigest())
|
|
|
|
|
|
|
|
|
|
if "#count" in result:
|
|
|
|
|
count = result["#count"]
|
|
|
|
|
if isinstance(count, str):
|
|
|
|
|
self.assertRegex(count, r"^ *(==|!=|<|<=|>|>=) *\d+ *$")
|
|
|
|
|
expr = "{} {}".format(len(tjob.url_list), count)
|
|
|
|
@ -130,10 +136,15 @@ class TestExtractorResults(unittest.TestCase):
|
|
|
|
|
else: # assume integer
|
|
|
|
|
self.assertEqual(len(tjob.url_list), count)
|
|
|
|
|
|
|
|
|
|
if "pattern" in result:
|
|
|
|
|
if "#pattern" in result:
|
|
|
|
|
self.assertGreater(len(tjob.url_list), 0)
|
|
|
|
|
for url in tjob.url_list:
|
|
|
|
|
self.assertRegex(url, result["pattern"])
|
|
|
|
|
self.assertRegex(url, result["#pattern"])
|
|
|
|
|
|
|
|
|
|
metadata = {k: v for k, v in result.items() if k[0] != "#"}
|
|
|
|
|
if metadata:
|
|
|
|
|
for kwdict in tjob.kwdict_list:
|
|
|
|
|
self._test_kwdict(kwdict, metadata)
|
|
|
|
|
|
|
|
|
|
def _test_kwdict(self, kwdict, tests):
|
|
|
|
|
for key, test in tests.items():
|
|
|
|
@ -352,39 +363,34 @@ def setup_test_config():
|
|
|
|
|
|
|
|
|
|
def generate_tests():
|
|
|
|
|
"""Dynamically generate extractor unittests"""
|
|
|
|
|
def _generate_test(extr, tcase):
|
|
|
|
|
def _generate_method(result):
|
|
|
|
|
def test(self):
|
|
|
|
|
url, result = tcase
|
|
|
|
|
print("\n", url, sep="")
|
|
|
|
|
self._run_test(extr, url, result)
|
|
|
|
|
print("\n" + result["#url"])
|
|
|
|
|
self._run_test(result)
|
|
|
|
|
return test
|
|
|
|
|
|
|
|
|
|
# enable selective testing for direct calls
|
|
|
|
|
if __name__ == '__main__' and len(sys.argv) > 1:
|
|
|
|
|
categories = sys.argv[1:]
|
|
|
|
|
negate = False
|
|
|
|
|
if categories[0].lower() == "all":
|
|
|
|
|
categories = ()
|
|
|
|
|
negate = True
|
|
|
|
|
elif categories[0].lower() == "broken":
|
|
|
|
|
categories = BROKEN
|
|
|
|
|
if __name__ == "__main__" and len(sys.argv) > 1:
|
|
|
|
|
category, _, subcategory = sys.argv[1].partition(":")
|
|
|
|
|
del sys.argv[1:]
|
|
|
|
|
|
|
|
|
|
tests = results.category(category)
|
|
|
|
|
if subcategory:
|
|
|
|
|
tests = [t for t in tests if t["#category"][-1] == subcategory]
|
|
|
|
|
else:
|
|
|
|
|
categories = BROKEN
|
|
|
|
|
negate = True
|
|
|
|
|
if categories:
|
|
|
|
|
print("skipping:", ", ".join(categories))
|
|
|
|
|
fltr = util.build_extractor_filter(categories, negate=negate)
|
|
|
|
|
tests = results.all()
|
|
|
|
|
|
|
|
|
|
# add 'test_...' methods
|
|
|
|
|
for extr in filter(fltr, extractor.extractors()):
|
|
|
|
|
name = "test_" + extr.__name__ + "_"
|
|
|
|
|
for num, tcase in enumerate(extr._get_tests(), 1):
|
|
|
|
|
test = _generate_test(extr, tcase)
|
|
|
|
|
test.__name__ = name + str(num)
|
|
|
|
|
setattr(TestExtractorResults, test.__name__, test)
|
|
|
|
|
enum = collections.defaultdict(int)
|
|
|
|
|
for result in tests:
|
|
|
|
|
name = "{1}_{2}".format(*result["#category"])
|
|
|
|
|
enum[name] += 1
|
|
|
|
|
|
|
|
|
|
method = _generate_method(result)
|
|
|
|
|
method.__name__ = "test_{}_{}".format(name, enum[name])
|
|
|
|
|
setattr(TestExtractorResults, method.__name__, method)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
generate_tests()
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
unittest.main(warnings='ignore')
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
unittest.main(warnings="ignore")
|
|
|
|
|