# -*- coding: utf-8 -*- # Copyright 2014, 2015 Mike Fährmann # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 as # published by the Free Software Foundation. """Common classes and constants used by extractor modules.""" import time import queue import requests import threading from .message import Message from .. import config class Extractor(): category = "" subcategory = "" directory_fmt = [""] filename_fmt = "" def __init__(self): self.session = requests.Session() def __iter__(self): return self.items() def items(self): yield Message.Version, 1 return def request(self, url, *args, **kwargs): return safe_request(self.session, url, *args, **kwargs) def enable_useragent(self): self.session.headers["User-Agent"] = ( "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20100101 Firefox/24.0" ) class AsynchronousExtractor(Extractor): def __init__(self): Extractor.__init__(self) queue_size = int(config.get(("queue-size",), default=5)) self.__queue = queue.Queue(maxsize=queue_size) self.__thread = threading.Thread(target=self.async_items, daemon=True) def __iter__(self): get = self.__queue.get done = self.__queue.task_done self.__thread.start() while True: task = get() if task is None: return yield task done() def async_items(self): put = self.__queue.put try: for task in self.items(): put(task) except Exception: import traceback print(traceback.format_exc()) put(None) def safe_request(session, url, method="GET", *args, **kwargs): tries = 0 while True: # try to connect to remote source try: r = session.request(method, url, *args, **kwargs) except requests.exceptions.ConnectionError: tries += 1 time.sleep(1) if tries == 5: raise continue # reject error-status-codes if r.status_code != requests.codes.ok: tries += 1 time.sleep(1) if tries == 5: r.raise_for_status() continue # everything ok -- proceed to download return r