#!/usr/bin/env python3 # SPDX-License-Identifier: AGPL-3.0-only import re import anyio import pleroma import contextlib from third_party import utils def parse_args(): return utils.arg_parser_factory(description='Reply service. Leave running in the background.').parse_args() class ReplyBot: def __init__(self, cfg): self.cfg = cfg self.pleroma = pleroma.Pleroma(access_token=cfg['access_token'], api_base_url=cfg['site']) async def run(self): async with self.pleroma as self.pleroma: self.me = (await self.pleroma.me())['id'] self.follows = frozenset(user['id'] for user in await self.pleroma.following(self.me)) async for notification in self.pleroma.stream_mentions(): await self.process_notification(notification) async def process_notification(self, notification): acct = "@" + notification['account']['acct'] # get the account's @ post_id = notification['status']['id'] context = await self.pleroma.status_context(post_id) # check if we've already been participating in this thread if self.check_hellthread_size(notification): return # check if we've already been participating in this thread if self.check_thread_length(context): return content = self.extract_toot(notification['status']['content'], self.cfg) if content.startswith('reply '): command = content.split(' ') await self.process_command(context, notification, command[0], command[1]) elif content in {'pin', 'unpin'}: await self.process_command(context, notification, content, '') else: await self.reply(notification) def check_hellthread_size(self, notification) -> bool: """return whether the notification should not be replied to due to sheer amount of mentions""" if len(notification['status']['mentions']) >= self.cfg['hellthread_threshold']: return True return False def check_thread_length(self, context) -> bool: """return whether the thread is too long to reply to""" posts = 0 for post in context['ancestors']: if post['account']['id'] == self.me: posts += 1 if posts >= self.cfg['max_thread_length']: return True return False async def process_command(self, context, notification, command, argument): post_id = notification['status']['id'] if notification['account']['id'] not in self.follows: # this user is unauthorized await self.pleroma.react(post_id, '❌') return # find the post the user is talking about for post in context['ancestors']: if post['id'] == notification['status']['in_reply_to_id']: target_post_id = post['id'] try: if command in ('pin', 'unpin'): await (self.pleroma.pin if command == 'pin' else self.pleroma.unpin)(target_post_id) elif command == 'reply': try: status = await self.pleroma.get_status(argument) if status['content'] != "": keywords = self.cleanup_toot(utils.extract_post_content(status['content']), self.cfg) else: keywords = None toot = await utils.make_post(self.cfg, keywords) toot = self.cleanup_toot(toot, self.cfg) await self.pleroma.reply(status, toot, cw=self.cfg['cw']) except: async with anyio.create_task_group() as tg: tg.start_soon(self.pleroma.react, post_id, '❌') tg.start_soon(self.pleroma.reply, notification['status'], 'Error: ' + exc.args[0]) except pleroma.BadRequest as exc: async with anyio.create_task_group() as tg: tg.start_soon(self.pleroma.react, post_id, '❌') tg.start_soon(self.pleroma.reply, notification['status'], 'Error: ' + exc.args[0]) else: await self.pleroma.react(post_id, '✅') async def reply(self, notification): status = notification['status'] if status['content'] != "": keywords = self.cleanup_toot(utils.extract_post_content(status['content']), self.cfg) else: keywords = None toot = await utils.make_post(self.cfg, keywords) toot = self.cleanup_toot(toot, self.cfg) await self.pleroma.reply(status, toot, cw=self.cfg['cw']) @staticmethod def cleanup_toot(text, cfg): PAIRED_PUNCTUATION = re.compile(r"[{}]".format(re.escape('[](){}"‘’“”«»„'))) if cfg['strip_paired_punctuation']: text = PAIRED_PUNCTUATION.sub("", text) text = text.replace("@", "@\u200b") # sanitize mentions text = utils.remove_mentions(cfg, text) return text @staticmethod def extract_toot(toot, cfg): text = utils.extract_post_content(toot) text = re.sub(r"^@\S+\s", r"", text) # remove the initial mention # text = text.lower() # treat text as lowercase for easier keyword matching (if this bot uses it) return text async def amain(): args = parse_args() cfg = utils.load_config(args.cfg) await ReplyBot(cfg).run() if __name__ == '__main__': with contextlib.suppress(KeyboardInterrupt): anyio.run(amain)