|
|
|
import asyncio
|
|
|
|
import threading
|
|
|
|
import re
|
|
|
|
|
|
|
|
from mipa.ext import commands, tasks
|
|
|
|
from mipa.router import Router
|
|
|
|
from mipac import LiteUser, UserManager, UserDetailed, NotificationReaction
|
|
|
|
from mipac.models import Note
|
|
|
|
from mipac.util import check_multi_arg
|
|
|
|
|
|
|
|
import roboduck
|
|
|
|
|
|
|
|
# Load Misskey configuration
|
|
|
|
config = roboduck.configparser.ConfigParser()
|
|
|
|
config.read(roboduck.Path(__file__).parent.joinpath('bot.cfg'))
|
|
|
|
url = "https://" + config.get("misskey", "instance_write")
|
|
|
|
token = config.get("misskey", "token")
|
|
|
|
instance_val = config.get("misskey", "instance_write")
|
|
|
|
|
|
|
|
try:
|
|
|
|
contentwarning = config.get("misskey", "cw")
|
|
|
|
if contentwarning.lower() == "none":
|
|
|
|
contentwarning = None
|
|
|
|
except (TypeError, ValueError, roboduck.configparser.NoOptionError):
|
|
|
|
contentwarning = None
|
|
|
|
|
|
|
|
if not check_multi_arg(url, token):
|
|
|
|
raise Exception("Misskey instance and token are required.")
|
|
|
|
|
|
|
|
|
|
|
|
class MyBot(commands.Bot):
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
super().__init__()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
async def get_bot_data(user_manager: UserManager):
|
|
|
|
bot_user = await user_manager.action.get_me()
|
|
|
|
bot_host = instance_val
|
|
|
|
bot_short = '@' + bot_user.username
|
|
|
|
bot_long = bot_short + '@' + bot_host
|
|
|
|
return bot_host, bot_short, bot_long
|
|
|
|
|
|
|
|
@tasks.loop(seconds=3600)
|
|
|
|
async def loop_1h(self):
|
|
|
|
text = roboduck.create_sentence()
|
|
|
|
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
|
|
|
|
|
|
|
|
@tasks.loop(seconds=600)
|
|
|
|
async def loop_10m(self):
|
|
|
|
text = roboduck.create_sentence()
|
|
|
|
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
|
|
|
|
|
|
|
|
@tasks.loop(seconds=43200)
|
|
|
|
async def loop_12h(self):
|
|
|
|
thread_update = threading.Thread(target=roboduck.update())
|
|
|
|
thread_update.daemon = True
|
|
|
|
thread_update.start()
|
|
|
|
|
|
|
|
async def on_ready(self, ws):
|
|
|
|
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
|
|
|
|
await self.client.note.action.send(
|
|
|
|
content=roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!",
|
|
|
|
visibility="specified")
|
|
|
|
self.loop_12h.start() # Launching renew posts every 12 hours
|
|
|
|
self.loop_1h.start() #
|
|
|
|
self.loop_10m.start()
|
|
|
|
print(roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!")
|
|
|
|
|
|
|
|
async def on_mention(self, note: Note):
|
|
|
|
# because this doesn't work for some reason*, I redid this shit
|
|
|
|
|
|
|
|
# Get note user properties
|
|
|
|
lite_user: LiteUser = note.__getattribute__("user")
|
|
|
|
|
|
|
|
# Get user manager (for stuff)
|
|
|
|
user_api: UserManager = lite_user.api
|
|
|
|
|
|
|
|
# Get the bot's handle...
|
|
|
|
bot_host, bot_short, bot_long = await self.get_bot_data(user_api)
|
|
|
|
# print("Bot\n Host: {0} - Short handle: {1} - Long handle: {2}".format(bot_host, bot_short, bot_long))
|
|
|
|
|
|
|
|
# Get note author and host
|
|
|
|
author_id = lite_user.id
|
|
|
|
author_user = lite_user.username
|
|
|
|
author_host = lite_user.host if lite_user.host is not None else bot_host
|
|
|
|
# print("Author: {0} - Host: {1}".format(author_user, author_host))
|
|
|
|
|
|
|
|
reply_to = "@{0}".format(author_user) if author_host == bot_host else "@{0}@{1}".format(author_user,
|
|
|
|
author_host)
|
|
|
|
|
|
|
|
# Get user details (only to validate if it's a f***ing bot)
|
|
|
|
user_detail = await user_api.action.fetch(author_id, author_user, author_host)
|
|
|
|
|
|
|
|
# is the author a
|
|
|
|
is_bot = user_detail.is_bot
|
|
|
|
|
|
|
|
# Get original note content
|
|
|
|
og_note: Note = note.__getattribute__("note")
|
|
|
|
# print("Note content: {0} ".format(og_note.content))
|
|
|
|
|
|
|
|
# Regex to do some magic
|
|
|
|
user_regex = "(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?"
|
|
|
|
|
|
|
|
# Find all mentions in the note
|
|
|
|
mentions = re.findall(user_regex, og_note.content)
|
|
|
|
|
|
|
|
# All users mentioned in the note
|
|
|
|
# print("All mentions:")
|
|
|
|
# print(mentions)
|
|
|
|
|
|
|
|
# Filter out the original author and the bot from the mentions
|
|
|
|
filtered = list(filter(lambda user: user not in [author_user, bot_short, bot_long], mentions))
|
|
|
|
# print("Filtered mentions:")
|
|
|
|
# print(filtered)
|
|
|
|
|
|
|
|
# Validate if it's not a bot, and then reply
|
|
|
|
if not is_bot:
|
|
|
|
text = reply_to + " " + ' '.join(filtered) + " "
|
|
|
|
text += roboduck.create_sentence()
|
|
|
|
# print("Resulting note:\n {0}".format(text))
|
|
|
|
await og_note.api.action.reply(content=text, cw=contentwarning)
|
|
|
|
|
|
|
|
async def on_reconnect(self, ws):
|
|
|
|
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
|
|
|
|
|
|
|
|
async def on_reply(self, note: Note):
|
|
|
|
# Just... follow the thread, eh?
|
|
|
|
await self.on_mention(note)
|
|
|
|
|
|
|
|
async def on_user_follow(self, user: UserDetailed):
|
|
|
|
# I'll add stuff here
|
|
|
|
print("test")
|
|
|
|
|
|
|
|
async def on_reaction(self, notice: NotificationReaction):
|
|
|
|
# I'll also add stuff here... later.
|
|
|
|
print("OI!")
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
databasepath = roboduck.Path(__file__).parent.joinpath('roboduck.db')
|
|
|
|
|
|
|
|
if not (roboduck.os.path.exists(databasepath) and roboduck.os.stat(databasepath).st_size != 0):
|
|
|
|
roboduck.init_bot()
|
|
|
|
|
|
|
|
bot = MyBot()
|
|
|
|
asyncio.run(bot.start(url, token))
|