import psycopg2 import requests from sshtunnel import SSHTunnelForwarder from psycopg2.extras import RealDictCursor class BotInstance: mk_url = None mk_token = None mk_user = None use_ssh = True msg_text = None msg_visibility = None msg_local_only = True msg_no_block = None msg_block_count = None ssh_address = None ssh_key = None ssh_user = None ssh_pass = None db_name = None db_user = None db_pass = None db_port = None db_host = None db_params = None def __init__(self, config): self.mk_url = config["mk_url"] self.mk_user = config["mk_username"] self.mk_token = config["mk_token"] self.use_ssh = config["use_ssh"] self.msg_text = config["message"]["text"] self.msg_visibility = config["message"]["visibility"] self.msg_local_only = config["message"]["local_only"] self.msg_no_block = config["message"]["no_block_text"] self.msg_block_count = config["message"]["block_count_text"] self.msg_latest_block = config["message"]["last_block_text"] self.ssh_address = config["tunnel"]["address"] self.ssh_port = config["tunnel"]["port"] self.ssh_key = config["tunnel"]["key"] self.ssh_user = config["tunnel"]["user"] self.ssh_pass = config["tunnel"]["password"] self.db_name = config["db"]["name"] self.db_user = config["db"]["user"] self.db_pass = config["db"]["password"] self.db_host = config["db"]["host"] self.db_port = config["db"]["port"] def __repr__(self): return "Bot instance!" # Database Conn, Server and Cursor def start_tunnel(self): tunnel = SSHTunnelForwarder( (self.ssh_address, self.ssh_port), ssh_private_key=self.ssh_key, ssh_username=self.ssh_user, ssh_password=self.ssh_pass, remote_bind_address=(self.db_host, self.db_port) ) return tunnel def get_db_cursor(self): if self.use_ssh: server = self.start_tunnel() server.start() port = server.local_bind_port else: port = self.db_port params = { 'database': self.db_name, 'user': self.db_user, 'password': self.db_pass, 'host': self.db_host, 'port': port, 'cursor_factory': RealDictCursor } conn = psycopg2.connect(**params) cursor = conn.cursor() return conn, cursor # Database Queries def daily_blocks_query(self): return "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "there.url as blocker_url," \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "join \"user_profile\" there on there.\"userId\" = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and b.\"createdAt\" between current_date - interval '1 day' and current_date " \ "and this.host is null;".format(self.mk_user) def weekly_blocks_query(self): return "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "there.url as blocker_url," \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "join \"user_profile\" there on there.\"userId\" = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and b.\"createdAt\" between current_date - interval '7 day' and current_date " \ "and this.host is null;".format(self.mk_user) def monthly_blocks_query(self): return "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "there.url as blocker_url," \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "join \"user_profile\" there on there.\"userId\" = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and b.\"createdAt\" between current_date - interval '1 month' and current_date " \ "and this.host is null;".format(self.mk_user) def all_blocks_query(self): return "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "there.url as blocker_url," \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "join \"user_profile\" there on there.\"userId\" = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and this.host is null;".format(self.mk_user) def latest_block_query(self): return "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "there.url as blocker_url," \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "join \"user_profile\" there on there.\"userId\" = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and this.host is null " \ "order by b.\"createdAt\" desc " \ "limit 1;".format(self.mk_user) # Database Reports def daily_blocks_report(self): conn, cursor = self.get_db_cursor() sql_all_time = self.all_blocks_query() sql_latest = self.latest_block_query() sql_daily = self.daily_blocks_query() # get all time blocks cursor.execute(sql_all_time) all_blocks = cursor.fetchall() # get latest block cursor.execute(sql_latest) latest_block = cursor.fetchall() # get daily blocks cursor.execute(sql_daily) daily_blocks = cursor.fetchall() # close cursor and conn cursor.close() conn.close() return all_blocks, latest_block, daily_blocks def weekly_blocks_report(self): conn, cursor = self.get_db_cursor() sql_all_time = self.all_blocks_query() sql_latest = self.latest_block_query() sql_weekly = self.weekly_blocks_query() # get all time blocks cursor.execute(sql_all_time) all_blocks = cursor.fetchall() # get latest block cursor.execute(sql_latest) latest_block = cursor.fetchall() # get daily blocks cursor.execute(sql_weekly) weekly_blocks = cursor.fetchall() # close cursor and conn cursor.close() conn.close() return all_blocks, latest_block, weekly_blocks def monthly_blocks_report(self): conn, cursor = self.get_db_cursor() sql_all_time = self.all_blocks_query() sql_latest = self.latest_block_query() sql_monthly = self.monthly_blocks_query() # get all time blocks cursor.execute(sql_all_time) all_blocks = cursor.fetchall() # get latest block cursor.execute(sql_latest) latest_block = cursor.fetchall() # get daily blocks cursor.execute(sql_monthly) monthly_blocks = cursor.fetchall() # close cursor and conn cursor.close() conn.close() return all_blocks, latest_block, monthly_blocks def all_time_blocks_report(self): conn, cursor = self.get_db_cursor() sql_all_time = self.all_blocks_query() sql_latest = self.latest_block_query() # All Time cursor.execute(sql_all_time) all_time = cursor.fetchall() # Latest cursor.execute(sql_latest) latest = cursor.fetchall() cursor.close() conn.close() return all_time, latest, all_time def block_reports(self, report_type, log_file): all_time_blocks = [] latest_block = [] match report_type: case "DAILY": all_time_blocks, latest_block, blocks = self.daily_blocks_report() report_header = "\nYesterday's blocks: %d\n" % len(blocks) case "WEEKLY": all_time_blocks, latest_block, blocks = self.weekly_blocks_report() report_header = "\nThis week's blocks: %d\n" % len(blocks) case "MONTHLY": all_time_blocks, latest_block, blocks = self.monthly_blocks_report() report_header = "\nThis month's blocks: %d\n" % len(blocks) case "ALL_TIME": all_time_blocks, latest_block, blocks = self.all_time_blocks_report() report_header = "\nAll time blocks: %d\n" % len(blocks) case "STATISTICS": blocks = None report_header = "" case _: blocks = None report_header = "" content = self.parse_block_list(result=blocks, bullet="* ") msg_latest = "" if self.msg_latest_block == "" \ else "\n%s %s" % (self.msg_latest_block, self.parse_block_list(latest_block)) msg_all_time = "" if self.msg_block_count == "" \ else "%s %d\n" % (self.msg_block_count, len(all_time_blocks)) msg_content = report_header + content + msg_latest + msg_all_time self.post_note(msg_content, log_file) def parse_block_list(self, result, bullet=""): block_line = "" if len(result) == 0: block_line += self.msg_no_block + "\n" else: for row in result: blocker_handle = row["blocker"] blocker_url = row["blocker_url"] block_date = row["block_date"] block_line += "%s %s (%s) - URL: %s\n" % (bullet, blocker_handle, block_date, blocker_url) return block_line # Misskey Post def post_note(self, content="", log_file=None): note_content = "%s%s" % (self.msg_text, content) payload = { "visibility": self.msg_visibility, "text": note_content, "localOnly": self.msg_local_only, "i": self.mk_token } create_note_request = requests.post(self.mk_url + "notes/create", json=payload) if create_note_request.status_code != 200: print("Misskey error: " + create_note_request.json()["error"]["message"], file=log_file)