You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
misskey-ebooks-bot/roboduck.py

661 lines
22 KiB

import requests
import json
import os
import sys
import regex
import configparser
import markovify
import sqlite3
from pathlib import Path
from datetime import *
from time import sleep
def check_str_to_bool(text) -> bool:
if text == "True" or text == "true" or text == "TRUE":
return True
elif text == "False" or text == "false" or text == "FALSE":
return False
else:
return True
def get_endpoint(instance: str) -> str:
# Try Misskey
url = "https://" + instance + "/api/ping"
req = requests.post(url)
if req.status_code == 200 and ("pong" in req.json()):
return "Misskey"
# Try Mastodon and Pleroma
url = "https://" + instance + "/api/v1/instance" # Pleroma uses the same API as Mastodon
req = requests.get(url)
if req.status_code == 200:
version = req.json()["version"]
if version.find("(compatible; Pleroma") > 0: # String only available in Pleroma instances. Mastodon will
return "Pleroma"
else:
return "Mastodon"
return "unknown"
def misskey_get_user_id(username: str, instance: str) -> str:
url = "https://" + instance + "/api/users/show"
try:
req = requests.post(url, json={"username": username, "host": instance})
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Username! " + str(err))
return ""
return req.json()["id"]
def misskey_get_notes(**kwargs):
note_id = "k"
since_id = ""
min_notes = 0
notes_list = []
return_list = []
username = kwargs["username"]
instance = kwargs["instance"]
print("Reading notes for @" + username + "@" + instance + ".")
if kwargs:
if "min_notes" in kwargs:
# print("min_notes found!")
init = True
min_notes = kwargs["min_notes"]
elif "lastnote" in kwargs:
# print("Lastnote found!")
init = False
since_id = kwargs["lastnote"]
else:
print("Wrong arguments given!")
print("Exiting routine!")
return
else:
print("No arguments given!")
print("Exiting routine")
return None
# Load configuration
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'bot.cfg'))
userid = misskey_get_user_id(username, instance) # Here are only Misskey ID is necessary so no need to check
# endpoint again
# Read & Sanitize Inputs from Config File
try:
include_replies = check_str_to_bool(config.get("markov", "includeReplies"))
except (TypeError, ValueError, configparser.NoOptionError):
include_replies = True
try:
include_my_renotes = check_str_to_bool(config.get("markov", "includeMyRenotes"))
except (TypeError, ValueError, configparser.NoOptionError):
include_my_renotes = False
try:
exclude_nsfw = check_str_to_bool(config.get("markov", "excludeNsfw"))
except (TypeError, ValueError, configparser.NoOptionError):
exclude_nsfw = True
try:
exclude_links = check_str_to_bool(config.get("markov", "exclude_links"))
except (TypeError, ValueError, configparser.NoOptionError):
exclude_links = False
run = True
oldnote = ""
while run:
if (init and len(notes_list) >= min_notes) or (oldnote == note_id):
break
if not init: # sinceid should only be used when updating the database so the json object has to be parsed
# every time
api_json = {
"userId": userid,
"includeReplies": include_replies,
"limit": 100,
"includeMyRenotes": include_my_renotes,
"withFiles": False,
"excludeNsfw": exclude_nsfw,
"untilId": note_id,
"sinceId": since_id}
else:
api_json = {
"userId": userid,
"includeReplies": include_replies,
"limit": 100,
"includeMyRenotes": include_my_renotes,
"withFiles": False,
"excludeNsfw": exclude_nsfw,
"untilId": note_id}
try:
req = requests.post("https://" + instance + "/api/users/notes", json=api_json)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Posts! " + str(err))
sys.exit(1)
for jsonObj in req.json():
notes_list.append(jsonObj)
if len(notes_list) == 0:
print("No new notes to load!")
return []
oldnote = note_id
note_id = notes_list[len(notes_list) - 1]["id"]
print(str(len(notes_list)) + " Notes read.")
print("Processing notes...")
for element in notes_list:
last_time = element["createdAt"]
last_timestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000)
content = element["text"]
if content is None: # Skips empty notes (I don't know how there could be empty notes)
continue
content = regex.sub(r"(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?", '',
content) # Remove instance name with regular expression
content = content.replace("::", ": :") # Break long emoji chains
content = content.replace("@", "@" + chr(8203))
if exclude_links:
content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))",
"", content)
note_dict = {"id": element["id"], "text": content, "timestamp": last_timestamp, "user_id": userid}
return_list.append(note_dict)
return return_list
def mastodon_get_user_id(username: str, instance: str) -> str:
url = "https://" + instance + "/api/v1/accounts/lookup?acct=" + username
try:
req = requests.get(url)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Username! " + str(err))
return ""
return req.json()["id"]
def mastodon_get_notes(**kwargs):
note_id = "k"
since_id = ""
min_notes = 0
notes_list = []
return_list = []
username = kwargs["username"]
instance = kwargs["instance"]
print("Reading notes for @" + username + "@" + instance + ".")
if kwargs:
if "min_notes" in kwargs:
# print("min_notes found!")
init = True
min_notes = kwargs["min_notes"]
elif "lastnote" in kwargs:
# print("Lastnote found!")
init = False
since_id = kwargs["lastnote"]
else:
print("Wrong arguments given!")
print("Exiting routine!")
return
else:
print("No arguments given!")
print("Exiting routine")
return None
# Load configuration
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'bot.cfg'))
userid = mastodon_get_user_id(username, instance) # Here are only Mastodon ID is necessary so no need to check
# endpoint again
# Read & Sanitize Inputs from Config File
try:
include_replies = check_str_to_bool(config.get("markov", "includeReplies"))
except (TypeError, ValueError, configparser.NoOptionError):
include_replies = True
try:
include_my_renotes = check_str_to_bool(config.get("markov", "includeMyRenotes"))
except (TypeError, ValueError, configparser.NoOptionError):
include_my_renotes = False
try:
exclude_nsfw = check_str_to_bool(config.get("markov", "excludeNsfw"))
except (TypeError, ValueError, configparser.NoOptionError):
exclude_nsfw = True
try:
exclude_links = check_str_to_bool(config.get("markov", "exclude_links"))
except (TypeError, ValueError, configparser.NoOptionError):
exclude_links = False
run = True
oldnote = ""
base_url = "https://" + instance + "/api/v1/accounts/" + userid + "/statuses?limit=20&exclude_replies="\
+ str(not include_replies)
if init:
url = base_url
else:
url = base_url + "&since_id=" + since_id
while run:
if (init and len(notes_list) >= min_notes) or (oldnote == note_id):
break
try:
req = requests.get(url)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Posts! " + str(err))
sys.exit(1)
for jsonObj in req.json():
notes_list.append(jsonObj)
if len(notes_list) == 0:
print("No new notes to load!")
return []
oldnote = note_id
note_id = notes_list[len(notes_list)-1]["id"]
if init:
url = base_url + "&max_id=" + note_id
else:
url = base_url + "&since_id=" + since_id + "&max_id=" + note_id
print(str(len(notes_list)) + " Notes read.")
print("Processing notes...")
for element in notes_list:
last_time = element["created_at"]
last_timestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000)
content = element["content"]
if content == "" and element["reblog"] is None: # Skips empty notes
continue
elif content == "" and element["reblog"] is not None:
if include_my_renotes: # Add Renotes to Database (if wanted)
content = element["reblog"]["content"]
content = content.replace(chr(8203), "")
else:
continue
if element["spoiler_text"] != "" and exclude_nsfw:
continue
else:
content = element["spoiler_text"] + " " + content
content = regex.sub(r"<[^>]+>", '', content) # Remove HTML tags in Note
content = regex.sub(r"([.,!?])", r"\1 ", content) # Add spaces behind punctuation mark
content = regex.sub(r"\s{2,}", " ", content) # Remove double spaces
content = regex.sub(r"(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?", '', content) # Remove instance name with regular
# expression
content = content.replace("::", ": :") # Break long emoji chains
content = content.replace("@", "@" + chr(8203)) # Add no-length-space behind @
if exclude_links:
content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))",
"", content)
note_dict = {"id": element["id"], "text": content, "timestamp": last_timestamp, "user_id": userid}
return_list.append(note_dict)
return return_list
def pleroma_get_user_id(username: str, instance: str) -> str:
# Pleroma uses the Mastodon API so as a shortcut I just reuse the Mastodon function
return mastodon_get_user_id(username, instance)
def pleroma_get_notes(**kwargs):
return_list = []
username = kwargs["username"]
instance = kwargs["instance"]
if kwargs:
if "min_notes" in kwargs:
return_list = mastodon_get_notes(username=username, instance=instance, min_notes=kwargs["min_notes"])
elif "lastnote" in kwargs:
return_list = mastodon_get_notes(username=username, instance=instance, lastnote=kwargs["lastnote"])
else:
print("Wrong arguments given!")
print("Exiting routine!")
return
else:
print("No arguments given!")
print("Exiting routine")
return None
return return_list
def get_user_id(username: str, instance: str) -> str:
# Determine API endpoint
api = get_endpoint(instance)
# Determine how to get User ID on used Software
if api == "Misskey":
return misskey_get_user_id(username, instance)
elif api == "Mastodon":
return mastodon_get_user_id(username, instance)
elif api == "Pleroma":
return pleroma_get_user_id(username, instance)
else:
print("Domain isn't Misskey, Pleroma or Mastodon!\nCheck spelling of the domain!")
sys.exit(1)
def calculate_markov_chain():
text = ""
# Load configuration
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
try:
max_notes = config.get("markov", "max_notes")
except (TypeError, ValueError, configparser.NoOptionError):
max_notes = "10000"
databasepath = Path(__file__).parent.joinpath('roboduck.db')
if not (os.path.exists(databasepath) and os.stat(databasepath).st_size != 0):
print("Roboduck database not already created!")
print("Exit initialization!")
sys.exit(0)
with open(databasepath, 'r', encoding='utf-8'):
database = sqlite3.connect(databasepath)
data = database.cursor()
data.execute("SELECT text FROM notes ORDER BY timestamp DESC LIMIT " + max_notes + ";")
rows = data.fetchall()
for row in rows:
text += row[0] + "\n"
markovchain = markovify.Text(text)
markovchain.compile(inplace=True)
markov_json = markovchain.to_json()
with open(Path(__file__).parent.joinpath('markov.json'), "w", encoding="utf-8") as markov:
json.dump(markov_json, markov)
def clean_database():
databasepath = Path(__file__).parent.joinpath('roboduck.db')
if not (os.path.exists(databasepath) and os.stat(databasepath).st_size != 0):
print("No database found!")
print("Please run Bot first!")
sys.exit(0)
with open(databasepath, "a", encoding="utf-8"):
database = sqlite3.connect(databasepath)
# Reading config file bot.cfg with config parser
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
# print((Path(__file__).parent).joinpath('bot.cfg'))
try:
max_notes = config.get("markov", "max_notes")
except (TypeError, ValueError):
max_notes = "10000"
for user in config.get("misskey", "users").split(";"):
username = user.split("@")[1]
instance = user.split("@")[2]
userid = get_user_id(username, instance)
data = database.cursor()
data.execute(
"DELETE FROM notes WHERE user_id=:user_id AND id NOT IN (SELECT id FROM notes WHERE user_id=:user_id "
"ORDER BY timestamp DESC LIMIT :max );",
{"user_id": userid, "max": int(max_notes)})
database.commit()
database.close()
def create_sentence():
with open((os.path.join(Path(__file__).parent, 'markov.json')), "r", encoding="utf-8") as markov:
markov_json = json.load(markov)
text_model = markovify.Text.from_json(markov_json)
# Reading config file bot.cfg with config parser
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
# Read & Sanitize Inputs
try:
test_output = check_str_to_bool(config.get("markov", "test_output"))
except (TypeError, ValueError, configparser.NoOptionError):
# print("test_output: " + str(err))
test_output = True
if test_output:
try:
tries = int(config.get("markov", "tries"))
except (TypeError, ValueError, configparser.NoOptionError):
# print("tries: " + str(err))
tries = 250
try:
max_overlap_ratio = float(config.get("markov", "max_overlap_ratio"))
except (TypeError, ValueError, configparser.NoOptionError):
# print("max_overlap_ratio: " + str(err))
max_overlap_ratio = 0.7
try:
max_overlap_total = int(config.get("markov", "max_overlap_total"))
except (TypeError, ValueError, configparser.NoOptionError):
# print("max_overlap_total: " + str(err))
max_overlap_total = 10
try:
max_words = int(config.get("markov", "max_words"))
except (TypeError, ValueError, configparser.NoOptionError):
# print("max_words: " + str(err))
max_words = None
try:
min_words = int(config.get("markov", "min_words"))
except (TypeError, ValueError, configparser.NoOptionError):
# print("min_words: " + str(err))
min_words = None
if max_words is not None and min_words is not None:
if min_words >= max_words:
# print("min_words ("+str(min_words)+") bigger than max_words ("+str(max_words)+")! Swapping values!")
swap = min_words
min_words = max_words
max_words = swap
else:
tries = 250
max_overlap_ratio = 0.7
max_overlap_total = 15
max_words = None
min_words = None
"""
#Debug section to print the used values
print("These values are used:")
print("test_output: " + str(test_output))
print("tries: " + str(tries))
print("max_overlap_ratio: " + str(max_overlap_ratio))
print("max_overlap_total: " + str(max_overlap_total))
print("max_words: " + str(max_words))
print("min_words: " + str(min_words))
"""
# Applying Inputs
note = text_model.make_sentence(
test_output=test_output,
tries=tries,
max_overlap_ratio=max_overlap_ratio,
max_overlap_total=max_overlap_total,
max_words=max_words,
min_words=min_words
)
if note is not None:
return note
else:
return "Error in markov chain sentence creation: Couldn't calculate sentence!\n\n☹ Please try again! "
def update():
notes_list = []
databasepath = Path(__file__).parent.joinpath('roboduck.db')
if not (os.path.exists(databasepath) and os.stat(databasepath).st_size != 0):
print("No database found!")
print("Please run Bot first!")
sys.exit(0)
with open(databasepath, "a", encoding="utf-8"):
database = sqlite3.connect(databasepath)
print("Connected to roboduck.db successful...")
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
for user in config.get("misskey", "users").split(";"):
username = user.split("@")[1]
instance = user.split("@")[2]
userid = get_user_id(username, instance)
data = database.cursor()
data.execute(
"SELECT id FROM notes WHERE timestamp = (SELECT MAX(timestamp) FROM notes WHERE user_id=:user_id) AND "
"user_id=:user_id;",
{"user_id": userid})
since_note = data.fetchone()[0]
api = get_endpoint(instance)
if api == "Misskey":
notes_list.extend(misskey_get_notes(lastnote=since_note, username=username, instance=instance))
elif api == "Mastodon":
notes_list.extend(mastodon_get_notes(lastnote=since_note, username=username, instance=instance))
elif api == "Pleroma":
notes_list.extend(pleroma_get_notes(lastnote=since_note, username=username, instance=instance))
else:
print("BIG ERROR!")
if notes_list == 0:
database.close()
return
print("Insert new notes to database...")
database.executemany("INSERT OR IGNORE INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?)",
[(note["id"], note["text"], note["timestamp"], note["user_id"]) for note in notes_list])
database.commit()
print("Notes updated!")
database.close()
print("Cleaning database...")
clean_database()
print("Database cleaned!")
print("Short sleep to prevent file collision...")
sleep(10)
print("Calculating new Markov Chain...")
calculate_markov_chain()
print("Markov Chain saved!")
print("\nUpdate done!")
def init_bot():
databasepath = Path(__file__).parent.joinpath('roboduck.db')
if os.path.exists(databasepath) and os.stat(databasepath).st_size != 0:
print("Roboduck database already created!")
print("Exit initialization!")
sys.exit(0)
print("Creating database...")
with open(databasepath, "w+", encoding="utf-8"):
database = sqlite3.connect(databasepath)
print("Connected to roboduck.db successful...")
print("Creating Table...")
database.execute("CREATE TABLE notes (id CHAR(20) PRIMARY KEY, text TEXT, timestamp INT, user_id CHAR(20));")
print("Table NOTES created...")
# Load configuration
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
try:
init_notes = int(config.get("markov", "min_notes"))
except (TypeError, ValueError):
# print(err)
init_notes = 1000
for user in config.get("misskey", "users").split(";"):
print("Try reading first " + str(init_notes) + " notes for " + user + ".")
username = user.split("@")[1]
instance = user.split("@")[2]
api = get_endpoint(instance)
print(instance + " is a " + api + " instance.")
if api == "Misskey":
notes_list = misskey_get_notes(min_notes=init_notes, username=username, instance=instance)
elif api == "Mastodon":
notes_list = mastodon_get_notes(min_notes=init_notes, username=username, instance=instance)
elif api == "Pleroma":
notes_list = pleroma_get_notes(min_notes=init_notes, username=username, instance=instance)
else:
print("BIG ERROR!")
print("Writing notes into database...")
database.executemany("INSERT INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?)",
[(note["id"], note["text"], note["timestamp"], note["user_id"]) for note in notes_list])
database.commit()
database.close()
print("Notes written...")
print("Creating Markov Chain")
calculate_markov_chain()
print("Markov Chain calculated & saved.\n")
print("Finished initialization!\n")
print("The bot will now be started!")