From 788f8550c706bc79f5c129ad92b4531a8938e4ad Mon Sep 17 00:00:00 2001 From: io Date: Fri, 17 Sep 2021 06:35:54 +0000 Subject: [PATCH] handle rate limits --- fetch_posts.py | 5 +++-- utils.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/fetch_posts.py b/fetch_posts.py index adfcee7..b60c7be 100755 --- a/fetch_posts.py +++ b/fetch_posts.py @@ -12,8 +12,8 @@ import contextlib from pleroma import Pleroma from bs4 import BeautifulSoup from functools import partial -from utils import shield, suppress from typing import Iterable, NewType +from utils import shield, HandleRateLimits, suppress from third_party.utils import extract_post_content USER_AGENT = ( @@ -48,6 +48,7 @@ class PostFetcher: raise_for_status=True, ), ) + self._rl_handler = HandleRateLimits(self._http) self._db = await stack.enter_async_context(aiosqlite.connect(self.config['db_path'])) await self._maybe_run_migrations() self._db.row_factory = aiosqlite.Row @@ -154,7 +155,7 @@ class PostFetcher: next_page_url = outbox['first'] while True: print(f'Fetching {next_page_url}... ') - async with self._http.get(next_page_url) as resp: page = await resp.json() + async with self._rl_handler.request('GET', next_page_url) as resp: page = await resp.json() for activity in page['orderedItems']: try: diff --git a/utils.py b/utils.py index ecc773d..8bb1daa 100644 --- a/utils.py +++ b/utils.py @@ -34,3 +34,35 @@ def removeprefix(s, prefix): except AttributeError: # compatibility for pre-3.9 return s[len(prefix):] if s.startswith(prefix) else s + +async def sleep_until(dt): + await anyio.sleep((dt - datetime.now(timezone.utc)).total_seconds()) + +class HandleRateLimits: + def __init__(self, http): + self.http = http + + def request(self, *args, **kwargs): + return _RateLimitContextManager(self.http, args, kwargs) + +class _RateLimitContextManager(contextlib.AbstractAsyncContextManager): + def __init__(self, http, args, kwargs): + self.http = http + self.args = args + self.kwargs = kwargs + + async def __aenter__(self): + self._request_cm = self.http.request(*self.args, **self.kwargs) + return await self._do_enter() + + async def _do_enter(self): + resp = await self._request_cm.__aenter__() + if resp.headers.get('X-RateLimit-Remaining') not in {'0', '1'}: + return resp + + await sleep_until(datetime.fromisoformat(resp.headers['X-RateLimit-Reset'])) + await self._request_cm.__aexit__(*(None,)*3) + return await self.__aenter__() + + async def __aexit__(self, *excinfo): + return await self._request_cm.__aexit__(*excinfo)