From b9868b7fba75425181c1c2015d2c416913570494 Mon Sep 17 00:00:00 2001 From: ente Date: Sun, 19 Jun 2022 16:27:33 +0200 Subject: [PATCH] Implemented reading from Mastodon & Pleroma --- .gitignore | 3 +- roboduck.py | 200 ++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 189 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 2f6867d..7749c17 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ test-rd.py bot.cfg __pycache__ mia-markov-chain -.venv \ No newline at end of file +.venv +notes.txt \ No newline at end of file diff --git a/roboduck.py b/roboduck.py index a337227..6aca6d3 100755 --- a/roboduck.py +++ b/roboduck.py @@ -24,7 +24,7 @@ def get_endpoint(instance: str) -> str: # Try Misskey url = "https://" + instance + "/api/ping" req = requests.post(url) - if req.status_code == 200: + if req.status_code == 200 and ("pong" in req.json()): return "Misskey" # Try Mastodon and Pleroma @@ -195,8 +195,142 @@ def mastodon_get_user_id(username: str, instance: str) -> str: return req.json()["id"] -def mastodon_get_notes(): - print("MASTODON'T NOTES!") # TODO Write routine to get Mastodon notes (check for limiting commands!) +def mastodon_get_notes(**kwargs): + note_id = "k" + since_id = "" + min_notes = 0 + notes_list = [] + return_list = [] + username = kwargs["username"] + instance = kwargs["instance"] + + print("Reading notes for @" + username + "@" + instance + ".") + if kwargs: + if "min_notes" in kwargs: + # print("min_notes found!") + init = True + min_notes = kwargs["min_notes"] + + elif "lastnote" in kwargs: + # print("Lastnote found!") + init = False + since_id = kwargs["lastnote"] + + else: + print("Wrong arguments given!") + print("Exiting routine!") + return + else: + print("No arguments given!") + print("Exiting routine") + return None + + # Load configuration + config = configparser.ConfigParser() + config.read(os.path.join(os.path.dirname(__file__), 'bot.cfg')) + + userid = mastodon_get_user_id(username, instance) # Here are only Mastodon ID is necessary so no need to check + # endpoint again + + # Read & Sanitize Inputs from Config File + try: + include_replies = check_str_to_bool(config.get("markov", "includeReplies")) + except (TypeError, ValueError, configparser.NoOptionError): + include_replies = True + + try: + include_my_renotes = check_str_to_bool(config.get("markov", "includeMyRenotes")) + except (TypeError, ValueError, configparser.NoOptionError): + include_my_renotes = False + + try: + exclude_nsfw = check_str_to_bool(config.get("markov", "excludeNsfw")) + except (TypeError, ValueError, configparser.NoOptionError): + exclude_nsfw = True + + try: + exclude_links = check_str_to_bool(config.get("markov", "exclude_links")) + except (TypeError, ValueError, configparser.NoOptionError): + exclude_links = False + + run = True + oldnote = "" + + base_url = "https://" + instance + "/api/v1/accounts/" + userid + "/statuses?limit=20&exclude_replies="\ + + str(not include_replies) + + if init: + url = base_url + else: + url = base_url + "&since_id=" + since_id + + while run: + + if (init and len(notes_list) >= min_notes) or (oldnote == note_id): + break + + try: + req = requests.get(url) + req.raise_for_status() + except requests.exceptions.HTTPError as err: + print("Couldn't get Posts! " + str(err)) + sys.exit(1) + + for jsonObj in req.json(): + notes_list.append(jsonObj) + if len(notes_list) == 0: + print("No new notes to load!") + return [] + + oldnote = note_id + + note_id = notes_list[len(notes_list)-1]["id"] + + if init: + url = base_url + "&max_id=" + note_id + else: + url = base_url + "&since_id=" + since_id + "&max_id=" + note_id + + print(str(len(notes_list)) + " Notes read.") + print("Processing notes...") + + for element in notes_list: + last_time = element["created_at"] + last_timestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000) + + content = element["content"] + + if content == "" and element["reblog"] is None: # Skips empty notes + continue + elif content == "" and element["reblog"] is not None: + if include_my_renotes: # Add Renotes to Database (if wanted) + content = element["reblog"]["content"] + content = content.replace(chr(8203), "") + else: + continue + + if element["spoiler_text"] != "" and exclude_nsfw: + continue + else: + content = element["spoiler_text"] + " " + content + + content = regex.sub(r"<[^>]+>", '', content) # Remove HTML tags in Note + + content = regex.sub(r"([.,!?])", r"\1 ", content) # Add spaces behind punctuation mark + content = regex.sub(r"\s{2,}", " ", content) # Remove double spaces + content = regex.sub(r"(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?", '', content) # Remove instance name with regular + # expression + content = content.replace("::", ": :") # Break long emoji chains + content = content.replace("@", "@" + chr(8203)) # Add no-length-space behind @ + + if exclude_links: + content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))", + "", content) + + note_dict = {"id": element["id"], "text": content, "timestamp": last_timestamp, "user_id": userid} + return_list.append(note_dict) + + return return_list def pleroma_get_user_id(username: str, instance: str) -> str: @@ -204,9 +338,26 @@ def pleroma_get_user_id(username: str, instance: str) -> str: return mastodon_get_user_id(username, instance) -def pleroma_get_notes(): - print("Pleroma notes!") # TODO Write routine to get Pleroma notes (check for limiting commands) +def pleroma_get_notes(**kwargs): + return_list = [] + username = kwargs["username"] + instance = kwargs["instance"] + if kwargs: + if "min_notes" in kwargs: + return_list = mastodon_get_notes(username=username, instance=instance, min_notes=kwargs["min_notes"]) + elif "lastnote" in kwargs: + return_list = mastodon_get_notes(username=username, instance=instance, lastnote=kwargs["lastnote"]) + else: + print("Wrong arguments given!") + print("Exiting routine!") + return + else: + print("No arguments given!") + print("Exiting routine") + return None + + return return_list def get_user_id(username: str, instance: str) -> str: # Determine API endpoint @@ -393,7 +544,7 @@ def update(): with open(databasepath, "a", encoding="utf-8"): database = sqlite3.connect(databasepath) - print("Connected to roboduck.db succesfull...") + print("Connected to roboduck.db successful...") config = configparser.ConfigParser() config.read(Path(__file__).parent.joinpath('bot.cfg')) @@ -409,7 +560,16 @@ def update(): since_note = data.fetchone()[0] - notes_list.extend(misskey_get_notes(lastnote=since_note, username=username, instance=instance)) + api = get_endpoint(instance) + + if api == "Misskey": + notes_list.extend(misskey_get_notes(lastnote=since_note, username=username, instance=instance)) + elif api == "Mastodon": + notes_list.extend(mastodon_get_notes(lastnote=since_note, username=username, instance=instance)) + elif api == "Pleroma": + notes_list.extend(pleroma_get_notes(lastnote=since_note, username=username, instance=instance)) + else: + print("BIG ERROR!") if notes_list == 0: database.close() @@ -427,7 +587,7 @@ def update(): clean_database() print("Database cleaned!") - print("Short sleep to prevent file collison...") + print("Short sleep to prevent file collision...") sleep(10) print("Calculating new Markov Chain...") @@ -451,7 +611,7 @@ def init_bot(): print("Connected to roboduck.db successful...") print("Creating Table...") - database.execute("CREATE TABLE notes (id CHAR(20) PRIMARY KEY, text CHAR(5000), timestamp INT, user_id CHAR(10));") + database.execute("CREATE TABLE notes (id CHAR(20) PRIMARY KEY, text TEXT, timestamp INT, user_id CHAR(20));") print("Table NOTES created...") @@ -459,15 +619,29 @@ def init_bot(): config = configparser.ConfigParser() config.read(Path(__file__).parent.joinpath('bot.cfg')) try: - initnotes = int(config.get("markov", "min_notes")) + init_notes = int(config.get("markov", "min_notes")) except (TypeError, ValueError): # print(err) - initnotes = 1000 + init_notes = 1000 for user in config.get("misskey", "users").split(";"): - print("Try reading first " + str(initnotes) + " notes for " + user + ".") + print("Try reading first " + str(init_notes) + " notes for " + user + ".") + + username = user.split("@")[1] + instance = user.split("@")[2] + + api = get_endpoint(instance) - notes_list = misskey_get_notes(min_notes=initnotes, username=user.split("@")[1], instance=user.split("@")[2]) + print(instance + " is a " + api + " instance.") + + if api == "Misskey": + notes_list = misskey_get_notes(min_notes=init_notes, username=username, instance=instance) + elif api == "Mastodon": + notes_list = mastodon_get_notes(min_notes=init_notes, username=username, instance=instance) + elif api == "Pleroma": + notes_list = pleroma_get_notes(min_notes=init_notes, username=username, instance=instance) + else: + print("BIG ERROR!") print("Writing notes into database...")