You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
misskey-ebooks-bot/rdbot.py

92 lines
3.2 KiB

3 years ago
import asyncio
import threading
3 years ago
from mipa.ext import commands, tasks
from mipa.router import Router
from mipac.models import Note
from mipac.util import check_multi_arg
3 years ago
import roboduck
3 years ago
# Load Misskey configuration
config = roboduck.configparser.ConfigParser()
config.read(roboduck.Path(__file__).parent.joinpath('bot.cfg'))
url = "https://" + config.get("misskey", "instance_write")
token = config.get("misskey", "token")
3 years ago
try:
contentwarning = config.get("misskey", "cw")
if contentwarning.lower() == "none":
contentwarning = None
except (TypeError, ValueError, roboduck.configparser.NoOptionError):
3 years ago
contentwarning = None
3 years ago
if not check_multi_arg(url, token):
raise Exception("Misskey instance and token are required.")
3 years ago
class MyBot(commands.Bot):
def __init__(self):
super().__init__()
@tasks.loop(seconds=3600)
3 years ago
async def loop_1h(self):
text = roboduck.create_sentence()
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
@tasks.loop(seconds=300)
async def loop_5m(self):
text = roboduck.create_sentence()
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
@tasks.loop(seconds=43200)
3 years ago
async def loop_12h(self):
thread_update = threading.Thread(target=roboduck.update())
thread_update.daemon = True
thread_update.start()
3 years ago
async def on_ready(self, ws):
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
await self.client.note.action.send(
content=roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!",
visibility="specified")
self.loop_12h.start() # Launching renew posts every 12 hours
self.loop_1h.start() #
self.loop_5m.start()
print(roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!")
3 years ago
async def on_mention(self, note: Note):
# because this doesn't work for some reason*, I redid this shit
from mipac import LiteUser
from mipac import UserManager
# Get note user properties
lite_user: LiteUser = note.__getattribute__("user")
# Get note and user managers
user_api: UserManager = lite_user.api
og_note: Note = note.__getattribute__("note")
# Get user details (only to validate if it's a f***ing bot
detail = await user_api.action.fetch(lite_user.id, lite_user.username, lite_user.host)
# Validate if it's not a bot, and then reply
if not detail.is_bot:
text = user_api.action.get_mention(lite_user) + " "
text += roboduck.create_sentence()
await og_note.api.action.reply(content=text, cw=contentwarning)
3 years ago
async def on_reconnect(self, ws):
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
3 years ago
3 years ago
if __name__ == "__main__":
databasepath = roboduck.Path(__file__).parent.joinpath('roboduck.db')
if not (roboduck.os.path.exists(databasepath) and roboduck.os.stat(databasepath).st_size != 0):
2 years ago
roboduck.init_bot()
bot = MyBot()
asyncio.run(bot.start(url, token))