handle rate limits

This commit is contained in:
io 2021-09-17 06:35:54 +00:00
parent b906abe2b1
commit 788f8550c7
2 changed files with 35 additions and 2 deletions

View file

@ -12,8 +12,8 @@ import contextlib
from pleroma import Pleroma from pleroma import Pleroma
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from functools import partial from functools import partial
from utils import shield, suppress
from typing import Iterable, NewType from typing import Iterable, NewType
from utils import shield, HandleRateLimits, suppress
from third_party.utils import extract_post_content from third_party.utils import extract_post_content
USER_AGENT = ( USER_AGENT = (
@ -48,6 +48,7 @@ class PostFetcher:
raise_for_status=True, raise_for_status=True,
), ),
) )
self._rl_handler = HandleRateLimits(self._http)
self._db = await stack.enter_async_context(aiosqlite.connect(self.config['db_path'])) self._db = await stack.enter_async_context(aiosqlite.connect(self.config['db_path']))
await self._maybe_run_migrations() await self._maybe_run_migrations()
self._db.row_factory = aiosqlite.Row self._db.row_factory = aiosqlite.Row
@ -154,7 +155,7 @@ class PostFetcher:
next_page_url = outbox['first'] next_page_url = outbox['first']
while True: while True:
print(f'Fetching {next_page_url}... ') print(f'Fetching {next_page_url}... ')
async with self._http.get(next_page_url) as resp: page = await resp.json() async with self._rl_handler.request('GET', next_page_url) as resp: page = await resp.json()
for activity in page['orderedItems']: for activity in page['orderedItems']:
try: try:

View file

@ -34,3 +34,35 @@ def removeprefix(s, prefix):
except AttributeError: except AttributeError:
# compatibility for pre-3.9 # compatibility for pre-3.9
return s[len(prefix):] if s.startswith(prefix) else s return s[len(prefix):] if s.startswith(prefix) else s
async def sleep_until(dt):
await anyio.sleep((dt - datetime.now(timezone.utc)).total_seconds())
class HandleRateLimits:
def __init__(self, http):
self.http = http
def request(self, *args, **kwargs):
return _RateLimitContextManager(self.http, args, kwargs)
class _RateLimitContextManager(contextlib.AbstractAsyncContextManager):
def __init__(self, http, args, kwargs):
self.http = http
self.args = args
self.kwargs = kwargs
async def __aenter__(self):
self._request_cm = self.http.request(*self.args, **self.kwargs)
return await self._do_enter()
async def _do_enter(self):
resp = await self._request_cm.__aenter__()
if resp.headers.get('X-RateLimit-Remaining') not in {'0', '1'}:
return resp
await sleep_until(datetime.fromisoformat(resp.headers['X-RateLimit-Reset']))
await self._request_cm.__aexit__(*(None,)*3)
return await self.__aenter__()
async def __aexit__(self, *excinfo):
return await self._request_cm.__aexit__(*excinfo)