You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
misskey-ebooks-bot/rdbot.py

163 lines
5.9 KiB

import asyncio
import threading
import re
from time import sleep
from mipa.ext import commands, tasks
from mipa.router import Router
from mipac import LiteUser, UserManager, UserDetailed, NotificationReaction
from mipac.models import Note
from mipac.util import check_multi_arg
import roboduck
# Load Misskey configuration
config = roboduck.configparser.ConfigParser()
config.read(roboduck.Path(__file__).parent.joinpath('bot.cfg'))
url = "https://" + config.get("misskey", "instance_write")
token = config.get("misskey", "token")
instance_val = config.get("misskey", "instance_write")
instance_admin = config.get("misskey", "instance_admin")
post_frequency = float(config.get("markov", "post_frequency"))
try:
contentwarning = config.get("misskey", "cw")
if contentwarning.lower() == "none":
contentwarning = None
except (TypeError, ValueError, roboduck.configparser.NoOptionError):
contentwarning = None
if not check_multi_arg(url, token):
raise Exception("Misskey instance and token are required.")
class MyBot(commands.Bot):
def __init__(self):
super().__init__()
@staticmethod
async def get_bot_data(user_manager: UserManager):
bot_user = await user_manager.action.get_me()
bot_host = instance_val
bot_short = '@' + bot_user.username
bot_long = bot_short + '@' + bot_host
return bot_host, bot_short, bot_long
@tasks.loop(seconds=3600)
async def loop_1h(self):
text = roboduck.create_sentence()
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
@tasks.loop(seconds=600)
async def loop_10m(self):
text = roboduck.create_sentence()
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
@tasks.loop(seconds=post_frequency)
async def loop_custom(self):
print("POSTED! " + roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
text = roboduck.create_sentence()
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
@tasks.loop(seconds=43200)
async def loop_12h(self):
thread_update = threading.Thread(target=roboduck.update())
thread_update.daemon = True
thread_update.start()
async def on_ready(self, ws):
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
await self.client.note.action.send(
content=roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!",
visibility="specified")
self.loop_12h.start() # Launching renew posts every 12 hours
self.loop_1h.start() #
# self.loop_10m.start()
self.loop_custom.start() # Custom duration bot
print(roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!")
async def on_mention(self, note: Note):
# because this doesn't work for some reason*, I redid this shit
# Get note user properties
lite_user: LiteUser = note.__getattribute__("user")
# Get user manager (for stuff)
user_api: UserManager = lite_user.api
# Get the bot's handle...
bot_host, bot_short, bot_long = await self.get_bot_data(user_api)
# print("Bot\n Host: {0} - Short handle: {1} - Long handle: {2}".format(bot_host, bot_short, bot_long))
# Get note author and host
author_id = lite_user.id
author_user = lite_user.username
author_host = lite_user.host if lite_user.host is not None else bot_host
# print("Author: {0} - Host: {1}".format(author_user, author_host))
reply_to = "@{0}".format(author_user) if author_host == bot_host else "@{0}@{1}".format(author_user,
author_host)
# Get user details (only to validate if it's a f***ing bot)
user_detail = await user_api.action.fetch(author_id, author_user, author_host)
# is the author a
is_bot = user_detail.is_bot
# Get original note content
og_note: Note = note.__getattribute__("note")
# print("Note content: {0} ".format(og_note.content))
# Regex to do some magic
user_regex = "(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?"
# Find all mentions in the note
mentions = re.findall(user_regex, og_note.content)
# All users mentioned in the note
# print("All mentions:")
# print(mentions)
# Filter out the original author and the bot from the mentions
filtered = list(filter(lambda user: user not in [author_user, bot_short, bot_long], mentions))
# print("Filtered mentions:")
# print(filtered)
# Validate if it's not a bot, and then reply
if not is_bot:
text = reply_to + " " + ' '.join(filtered) + " "
text += roboduck.create_sentence()
# print("Resulting note:\n {0}".format(text))
await og_note.api.action.reply(content=text, cw=contentwarning)
async def on_reconnect(self, ws):
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
async def on_reply(self, note: Note):
# Just... follow the thread, eh?
await self.on_mention(note)
async def on_user_follow(self, user: UserDetailed):
# I'll add stuff here
print("test")
async def on_reaction(self, notice: NotificationReaction):
# I'll also add stuff here... later.
print("OI!")
if __name__ == "__main__":
databasepath = roboduck.Path(__file__).parent.joinpath('roboduck.db')
if not (roboduck.os.path.exists(databasepath) and roboduck.os.stat(databasepath).st_size != 0):
roboduck.init_bot()
bot = MyBot()
try:
asyncio.run(bot.start(url, token))
except Exception as e:
print("Failed to run, retrying...")
sleep(10)
asyncio.run(bot.start(url, token))