import psycopg2 import requests from sshtunnel import SSHTunnelForwarder from psycopg2.extras import RealDictCursor class BotInstance: mk_url = None mk_token = None mk_user = None use_ssh = True msg_text = None msg_visibility = None msg_local_only = True msg_no_block = None msg_block_count = None ssh_address = None ssh_key = None ssh_user = None ssh_pass = None db_name = None db_user = None db_pass = None db_port = None db_host = None db_params = None def __init__(self, config): self.mk_url = config["mk_url"] self.mk_user = config["mk_username"] self.mk_token = config["mk_token"] self.use_ssh = config["use_ssh"] self.msg_text = config["message"]["text"] self.msg_visibility = config["message"]["visibility"] self.msg_local_only = config["message"]["local_only"] self.msg_no_block = config["message"]["no_block_text"] self.msg_block_count = config["message"]["block_count_text"] self.msg_latest_block = config["message"]["last_block_text"] self.ssh_address = config["tunnel"]["address"] self.ssh_port = config["tunnel"]["port"] self.ssh_key = config["tunnel"]["key"] self.ssh_user = config["tunnel"]["user"] self.ssh_pass = config["tunnel"]["password"] self.db_name = config["db"]["name"] self.db_user = config["db"]["user"] self.db_pass = config["db"]["password"] self.db_host = config["db"]["host"] self.db_port = config["db"]["port"] def __repr__(self): return "Bot instance!" # Database Conn, Server and Cursor def start_tunnel(self): tunnel = SSHTunnelForwarder( (self.ssh_address, self.ssh_port), ssh_private_key=self.ssh_key, ssh_username=self.ssh_user, ssh_password=self.ssh_pass, remote_bind_address=(self.db_host, self.db_port) ) return tunnel def get_db_cursor(self): if self.use_ssh: server = self.start_tunnel() server.start() port = server.local_bind_port else: port = self.db_port params = { 'database': self.db_name, 'user': self.db_user, 'password': self.db_pass, 'host': self.db_host, 'port': port, 'cursor_factory': RealDictCursor } conn = psycopg2.connect(**params) cursor = conn.cursor() return conn, cursor # Database Queries def get_all_blocks(self): conn, cursor = self.get_db_cursor() sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and this.host is null;".format(self.mk_user) cursor.execute(sql) rows = cursor.fetchall() cursor.close() conn.close() return rows def get_yesterdays_blocks(self): conn, cursor = self.get_db_cursor() sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and b.\"createdAt\" between current_date - interval '1 day' and current_date " \ "and this.host is null;".format(self.mk_user) cursor.execute(sql) rows = cursor.fetchall() cursor.close() conn.close() return rows def get_this_weeks_blocks(self): conn, cursor = self.get_db_cursor() sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and b.\"createdAt\" between current_date - interval '7 day' and current_date " \ "and this.host is null;".format(self.mk_user) cursor.execute(sql) rows = cursor.fetchall() cursor.close() conn.close() return rows def get_this_months_blocks(self): conn, cursor = self.get_db_cursor() sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and b.\"createdAt\" between current_date - interval '1 month' and current_date " \ "and this.host is null;".format(self.mk_user) cursor.execute(sql) rows = cursor.fetchall() cursor.close() conn.close() return rows def get_latest_block(self): conn, cursor = self.get_db_cursor() sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.\"username\" like \'{0}\' " \ "and this.host is null " \ "order by b.\"createdAt\" desc " \ "limit 1;".format(self.mk_user) cursor.execute(sql) rows = cursor.fetchall() cursor.close() conn.close() return rows # Reports def parse_block_list(self, result, bullet=""): block_line = "" if len(result) == 0: block_line += self.msg_no_block + "\n" else: for row in result: blocker = row["blocker"] block_date = row["block_date"] block_line += "%s@%s (%s)\n" % (bullet, blocker, block_date) return block_line def block_reports(self, report_type, log_file): all_time_blocks = self.get_all_blocks() latest_block = self.get_latest_block() match report_type: case "DAILY": blocks = self.get_yesterdays_blocks() report_header = "\nYesterday's blocks: %d\n" % len(blocks) case "WEEKLY": blocks = self.get_this_weeks_blocks() report_header = "\nThis week's blocks: %d\n" % len(blocks) case "MONTHLY": blocks = self.get_this_months_blocks() report_header = "\nThis month's blocks: %d\n" % len(blocks) case "ALL_TIME": blocks = all_time_blocks report_header = "\nAll time blocks: %d\n" % len(blocks) case "STATISTICS": blocks = None report_header = "" case _: blocks = None report_header = "" content = self.parse_block_list(result=blocks, bullet="* ") msg_latest = "" if self.msg_latest_block == "" \ else "\n%s %s" % (self.msg_latest_block, self.parse_block_list(latest_block)) msg_all_time = "" if self.msg_block_count == "" \ else "%s %d\n" % (self.msg_block_count, len(all_time_blocks)) msg_content = report_header + content + msg_latest + msg_all_time self.post_note(msg_content, log_file) # Misskey Post def post_note(self, content="", log_file=None): note_content = "%s%s" % (self.msg_text, content) payload = { "visibility": self.msg_visibility, "text": note_content, "localOnly": self.msg_local_only, "i": self.mk_token } create_note_request = requests.post(self.mk_url + "notes/create", json=payload) if create_note_request.status_code != 200: print("Misskey error: " + create_note_request.json()["error"]["message"], file=log_file)