save some memory by using account handles instead of objects

This commit is contained in:
io 2021-08-19 10:40:57 +00:00
parent 4e16eef4e1
commit 191214dbd6
2 changed files with 24 additions and 11 deletions

View file

@ -6,12 +6,14 @@ import anyio
import aiohttp import aiohttp
import platform import platform
import pendulum import pendulum
import operator
import aiosqlite import aiosqlite
import contextlib import contextlib
from utils import shield from utils import shield
from pleroma import Pleroma from pleroma import Pleroma
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from functools import partial from functools import partial
from typing import Iterable, NewType
from third_party.utils import extract_post_content from third_party.utils import extract_post_content
USER_AGENT = ( USER_AGENT = (
@ -52,16 +54,20 @@ class PostFetcher:
async def __aexit__(self, *excinfo): async def __aexit__(self, *excinfo):
return await self._ctx_stack.__aexit__(*excinfo) return await self._ctx_stack.__aexit__(*excinfo)
# username@instance
AccountHandle = NewType('AccountHandle', str)
async def fetch_all(self): async def fetch_all(self):
"""fetch all following accounts, or an iterable of accounts if provided"""
await self._fedi.verify_credentials() await self._fedi.verify_credentials()
self._completed_accounts = {} self._completed_accounts = {}
async with anyio.create_task_group() as tg: async with anyio.create_task_group() as tg:
for acc in await self._fedi.following(): for acc in map(operator.itemgetter('fqn'), await self._fedi.following()):
tg.start_soon(self._do_account, acc) tg.start_soon(self._do_account, acc)
async def _do_account(self, acc): async def _do_account(self, acc: AccountHandle):
async with anyio.create_task_group() as tg: async with anyio.create_task_group() as tg:
self._completed_accounts[acc['fqn']] = done_ev = anyio.Event() self._completed_accounts[acc] = done_ev = anyio.Event()
tx, rx = anyio.create_memory_object_stream() tx, rx = anyio.create_memory_object_stream()
async with rx, tx: async with rx, tx:
tg.start_soon(self._process_pages, rx, acc) tg.start_soon(self._process_pages, rx, acc)
@ -72,7 +78,7 @@ class PostFetcher:
tg.cancel_scope.cancel() tg.cancel_scope.cancel()
async def _process_pages(self, stream, account): async def _process_pages(self, stream, account):
done_ev = self._completed_accounts[account['fqn']] done_ev = self._completed_accounts[account]
try: try:
async for activity in stream: async for activity in stream:
try: try:
@ -83,10 +89,10 @@ class PostFetcher:
# this means we've encountered an item we already have saved # this means we've encountered an item we already have saved
break break
self.erroneous_accounts.append(account['fqn']) self.erroneous_accounts.append(account)
raise raise
finally: finally:
print('Saving posts from', account['fqn'], 'to the DB') print('Saving posts from', account, 'to the DB')
await self._db.commit() await self._db.commit()
done_ev.set() done_ev.set()
@ -113,19 +119,19 @@ class PostFetcher:
# TODO figure out why i put shield here lol # TODO figure out why i put shield here lol
@shield @shield
async def _fetch_account(self, tx, account): async def _fetch_account(self, tx, account: AccountHandle):
done_ev = self._completed_accounts[account['fqn']] done_ev = self._completed_accounts[account]
try: try:
outbox = await self.fetch_outbox(account['fqn']) outbox = await self.fetch_outbox(account)
except Exception as exc: except Exception as exc:
import traceback import traceback
traceback.print_exception(type(exc), exc, exc.__traceback__) traceback.print_exception(type(exc), exc, exc.__traceback__)
done_ev.set() done_ev.set()
self.erroneous_accounts.append(account['fqn']) self.erroneous_accounts.append(account)
return return
print(f'Fetching posts for {account["acct"]}...') print(f'Fetching posts for {account}...')
next_page_url = outbox['first'] next_page_url = outbox['first']
while True: while True:

View file

@ -9,3 +9,10 @@ def shield(f):
with anyio.CancelScope(shield=True): with anyio.CancelScope(shield=True):
return await f(*args, **kwargs) return await f(*args, **kwargs)
return shielded return shielded
def removeprefix(s, prefix):
try:
return s.removeprefix(prefix)
except AttributeError:
# compatibility for pre-3.9
return s[len(prefix):] if s.startswith(prefix) else s