|
|
|
@ -1,7 +1,7 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
|
|
# Copyright 2018-2021 Mike Fährmann
|
|
|
|
|
# Copyright 2018-2022 Mike Fährmann
|
|
|
|
|
#
|
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
|
|
@ -13,9 +13,9 @@ import unittest
|
|
|
|
|
from unittest.mock import Mock, MagicMock, patch
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
|
import base64
|
|
|
|
|
import logging
|
|
|
|
|
import os.path
|
|
|
|
|
import binascii
|
|
|
|
|
import tempfile
|
|
|
|
|
import threading
|
|
|
|
|
import http.server
|
|
|
|
@ -23,6 +23,7 @@ import http.server
|
|
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
from gallery_dl import downloader, extractor, output, config, path # noqa E402
|
|
|
|
|
from gallery_dl.downloader.http import MIME_TYPES, SIGNATURE_CHECKS # noqa E402
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MockDownloaderModule(Mock):
|
|
|
|
@ -172,48 +173,52 @@ class TestHTTPDownloader(TestDownloaderBase):
|
|
|
|
|
|
|
|
|
|
port = 8088
|
|
|
|
|
cls.address = "http://127.0.0.1:{}".format(port)
|
|
|
|
|
cls._jpg = cls.address + "/image.jpg"
|
|
|
|
|
cls._png = cls.address + "/image.png"
|
|
|
|
|
cls._gif = cls.address + "/image.gif"
|
|
|
|
|
|
|
|
|
|
server = http.server.HTTPServer(("", port), HttpRequestHandler)
|
|
|
|
|
threading.Thread(target=server.serve_forever, daemon=True).start()
|
|
|
|
|
|
|
|
|
|
def _run_test(self, ext, input, output,
|
|
|
|
|
extension, expected_extension=None):
|
|
|
|
|
TestDownloaderBase._run_test(
|
|
|
|
|
self, self.address + "/image." + ext, input, output,
|
|
|
|
|
extension, expected_extension)
|
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
|
self.downloader.minsize = self.downloader.maxsize = None
|
|
|
|
|
|
|
|
|
|
def test_http_download(self):
|
|
|
|
|
self._run_test(self._jpg, None, DATA_JPG, "jpg", "jpg")
|
|
|
|
|
self._run_test(self._png, None, DATA_PNG, "png", "png")
|
|
|
|
|
self._run_test(self._gif, None, DATA_GIF, "gif", "gif")
|
|
|
|
|
self._run_test("jpg", None, DATA["jpg"], "jpg", "jpg")
|
|
|
|
|
self._run_test("png", None, DATA["png"], "png", "png")
|
|
|
|
|
self._run_test("gif", None, DATA["gif"], "gif", "gif")
|
|
|
|
|
|
|
|
|
|
def test_http_offset(self):
|
|
|
|
|
self._run_test(self._jpg, DATA_JPG[:123], DATA_JPG, "jpg", "jpg")
|
|
|
|
|
self._run_test(self._png, DATA_PNG[:12] , DATA_PNG, "png", "png")
|
|
|
|
|
self._run_test(self._gif, DATA_GIF[:1] , DATA_GIF, "gif", "gif")
|
|
|
|
|
self._run_test("jpg", DATA["jpg"][:123], DATA["jpg"], "jpg", "jpg")
|
|
|
|
|
self._run_test("png", DATA["png"][:12] , DATA["png"], "png", "png")
|
|
|
|
|
self._run_test("gif", DATA["gif"][:1] , DATA["gif"], "gif", "gif")
|
|
|
|
|
|
|
|
|
|
def test_http_extension(self):
|
|
|
|
|
self._run_test(self._jpg, None, DATA_JPG, None, "jpg")
|
|
|
|
|
self._run_test(self._png, None, DATA_PNG, None, "png")
|
|
|
|
|
self._run_test(self._gif, None, DATA_GIF, None, "gif")
|
|
|
|
|
self._run_test("jpg", None, DATA["jpg"], None, "jpg")
|
|
|
|
|
self._run_test("png", None, DATA["png"], None, "png")
|
|
|
|
|
self._run_test("gif", None, DATA["gif"], None, "gif")
|
|
|
|
|
|
|
|
|
|
def test_http_adjust_extension(self):
|
|
|
|
|
self._run_test(self._jpg, None, DATA_JPG, "png", "jpg")
|
|
|
|
|
self._run_test(self._png, None, DATA_PNG, "gif", "png")
|
|
|
|
|
self._run_test(self._gif, None, DATA_GIF, "jpg", "gif")
|
|
|
|
|
self._run_test("jpg", None, DATA["jpg"], "png", "jpg")
|
|
|
|
|
self._run_test("png", None, DATA["png"], "gif", "png")
|
|
|
|
|
self._run_test("gif", None, DATA["gif"], "jpg", "gif")
|
|
|
|
|
|
|
|
|
|
def test_http_filesize_min(self):
|
|
|
|
|
url = self.address + "/image.gif"
|
|
|
|
|
pathfmt = self._prepare_destination(None, extension=None)
|
|
|
|
|
self.downloader.minsize = 100
|
|
|
|
|
with self.assertLogs(self.downloader.log, "WARNING"):
|
|
|
|
|
success = self.downloader.download(self._gif, pathfmt)
|
|
|
|
|
success = self.downloader.download(url, pathfmt)
|
|
|
|
|
self.assertFalse(success)
|
|
|
|
|
|
|
|
|
|
def test_http_filesize_max(self):
|
|
|
|
|
url = self.address + "/image.jpg"
|
|
|
|
|
pathfmt = self._prepare_destination(None, extension=None)
|
|
|
|
|
self.downloader.maxsize = 100
|
|
|
|
|
with self.assertLogs(self.downloader.log, "WARNING"):
|
|
|
|
|
success = self.downloader.download(self._jpg, pathfmt)
|
|
|
|
|
success = self.downloader.download(url, pathfmt)
|
|
|
|
|
self.assertFalse(success)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -237,15 +242,10 @@ class TestTextDownloader(TestDownloaderBase):
|
|
|
|
|
class HttpRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
|
|
|
|
|
|
|
|
def do_GET(self):
|
|
|
|
|
if self.path == "/image.jpg":
|
|
|
|
|
content_type = "image/jpeg"
|
|
|
|
|
output = DATA_JPG
|
|
|
|
|
elif self.path == "/image.png":
|
|
|
|
|
content_type = "image/png"
|
|
|
|
|
output = DATA_PNG
|
|
|
|
|
elif self.path == "/image.gif":
|
|
|
|
|
content_type = "image/gif"
|
|
|
|
|
output = DATA_GIF
|
|
|
|
|
if self.path.startswith("/image."):
|
|
|
|
|
ext = self.path.rpartition(".")[2]
|
|
|
|
|
content_type = MIME_TYPES.get(ext)
|
|
|
|
|
output = DATA[ext]
|
|
|
|
|
else:
|
|
|
|
|
self.send_response(404)
|
|
|
|
|
self.wfile.write(self.path.encode())
|
|
|
|
@ -275,31 +275,57 @@ class HttpRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
|
|
|
self.wfile.write(output)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATA_JPG = base64.standard_b64decode("""
|
|
|
|
|
/9j/4AAQSkZJRgABAQEASABIAAD/2wBD
|
|
|
|
|
AAEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB
|
|
|
|
|
AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB
|
|
|
|
|
AQEBAQEBAQEBAQEBAQEBAQH/2wBDAQEB
|
|
|
|
|
AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB
|
|
|
|
|
AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB
|
|
|
|
|
AQEBAQEBAQEBAQEBAQH/wAARCAABAAED
|
|
|
|
|
AREAAhEBAxEB/8QAFAABAAAAAAAAAAAA
|
|
|
|
|
AAAAAAAACv/EABQQAQAAAAAAAAAAAAAA
|
|
|
|
|
AAAAAAD/xAAUAQEAAAAAAAAAAAAAAAAA
|
|
|
|
|
AAAA/8QAFBEBAAAAAAAAAAAAAAAAAAAA
|
|
|
|
|
AP/aAAwDAQACEQMRAD8AfwD/2Q==""")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATA_PNG = base64.standard_b64decode("""
|
|
|
|
|
iVBORw0KGgoAAAANSUhEUgAAAAEAAAAB
|
|
|
|
|
CAAAAAA6fptVAAAACklEQVQIHWP4DwAB
|
|
|
|
|
AQEANl9ngAAAAABJRU5ErkJggg==""")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATA_GIF = base64.standard_b64decode("""
|
|
|
|
|
R0lGODdhAQABAIAAAP///////ywAAAAA
|
|
|
|
|
AQABAAACAkQBADs=""")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DATA = {
|
|
|
|
|
"jpg" : binascii.a2b_base64(
|
|
|
|
|
"/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAAEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB"
|
|
|
|
|
"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQH/2wBDAQEB"
|
|
|
|
|
"AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEB"
|
|
|
|
|
"AQEBAQEBAQEBAQEBAQH/wAARCAABAAEDAREAAhEBAxEB/8QAFAABAAAAAAAAAAAA"
|
|
|
|
|
"AAAAAAAACv/EABQQAQAAAAAAAAAAAAAAAAAAAAD/xAAUAQEAAAAAAAAAAAAAAAAA"
|
|
|
|
|
"AAAA/8QAFBEBAAAAAAAAAAAAAAAAAAAAAP/aAAwDAQACEQMRAD8AfwD/2Q=="),
|
|
|
|
|
"png" : binascii.a2b_base64(
|
|
|
|
|
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAAAAAA6fptVAAAACklEQVQIHWP4DwAB"
|
|
|
|
|
"AQEANl9ngAAAAABJRU5ErkJggg=="),
|
|
|
|
|
"gif" : binascii.a2b_base64(
|
|
|
|
|
"R0lGODdhAQABAIAAAP///////ywAAAAAAQABAAACAkQBADs="),
|
|
|
|
|
"bmp" : b"BM",
|
|
|
|
|
"webp": b"RIFF????WEBP",
|
|
|
|
|
"avif": b"????ftypavif",
|
|
|
|
|
"svg" : b"<?xml",
|
|
|
|
|
"ico" : b"\x00\x00\x01\x00",
|
|
|
|
|
"cur" : b"\x00\x00\x02\x00",
|
|
|
|
|
"psd" : b"8BPS",
|
|
|
|
|
"webm": b"\x1A\x45\xDF\xA3",
|
|
|
|
|
"ogg" : b"OggS",
|
|
|
|
|
"wav" : b"RIFF????WAVE",
|
|
|
|
|
"mp3" : b"ID3",
|
|
|
|
|
"zip" : b"PK\x03\x04",
|
|
|
|
|
"rar" : b"\x52\x61\x72\x21\x1A\x07",
|
|
|
|
|
"7z" : b"\x37\x7A\xBC\xAF\x27\x1C",
|
|
|
|
|
"pdf" : b"%PDF-",
|
|
|
|
|
"swf" : b"CWS",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# reverse mime types mapping
|
|
|
|
|
MIME_TYPES = {
|
|
|
|
|
ext: mtype
|
|
|
|
|
for mtype, ext in MIME_TYPES.items()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_tests():
|
|
|
|
|
def _generate_test(ext):
|
|
|
|
|
def test(self):
|
|
|
|
|
self._run_test(ext, None, DATA[ext], "bin", ext)
|
|
|
|
|
test.__name__ = "test_http_ext_" + ext
|
|
|
|
|
return test
|
|
|
|
|
|
|
|
|
|
for ext in DATA:
|
|
|
|
|
test = _generate_test(ext)
|
|
|
|
|
setattr(TestHTTPDownloader, test.__name__, test)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
generate_tests()
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
unittest.main()
|
|
|
|
|