You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

329 lines
11 KiB

import psycopg2
import requests
from sshtunnel import SSHTunnelForwarder
from psycopg2.extras import RealDictCursor
class BotInstance:
mk_url = None
mk_token = None
mk_user = None
use_ssh = True
msg_text = None
msg_visibility = None
msg_local_only = True
msg_no_block_text = None
ssh_address = None
ssh_key = None
ssh_user = None
ssh_password = None
db_name = None
db_user = None
db_pass = None
db_port = None
db_host = None
db_params = None
def __init__(self, config):
self.mk_url = config["mk_url"]
self.mk_user = config["mk_username"]
self.mk_token = config["mk_token"]
self.use_ssh = config["use_ssh"]
self.msg_text = config["message"]["text"]
self.msg_visibility = config["message"]["visibility"]
self.msg_local_only = config["message"]["localOnly"]
self.msg_no_block_text = config["message"]["no_block_text"]
self.ssh_address = config["tunnel"]["address"]
self.ssh_port = config["tunnel"]["port"]
self.ssh_key = config["tunnel"]["key"]
self.ssh_user = config["tunnel"]["user"]
self.ssh_password = config["tunnel"]["password"]
self.db_name = config["db"]["name"]
self.db_user = config["db"]["user"]
self.db_password = config["db"]["password"]
self.db_host = config["db"]["host"]
self.db_port = config["db"]["port"]
def __repr__(self):
return "Bot instance!"
# Database Conn, Server and Cursor
def start_tunnel(self):
tunnel = None
try:
print("Instantiate tunnel...")
tunnel = SSHTunnelForwarder(
(self.ssh_address, self.ssh_port),
ssh_private_key=self.ssh_key,
ssh_username=self.ssh_user,
ssh_password=self.ssh_password,
remote_bind_address=('localhost', 5432)
)
except Exception as e:
print("Error: ", e)
return tunnel
def get_db_cursor(self):
if self.use_ssh:
server = self.start_tunnel()
server.start()
port = server.local_bind_port
else:
port = self.db_port
params = {
'database': self.db_name,
'user': self.db_user,
'password': self.db_password,
'host': self.db_host,
'port': port,
'cursor_factory': RealDictCursor
}
conn = psycopg2.connect(**params)
cursor = conn.cursor()
# except Exception as e:
# print("Error: ", e)
return conn, cursor
# Database Queries
def get_all_blocks(self):
rows = None
try:
print("Get DBconnection and cursor...")
conn, cursor = self.get_db_cursor()
sql = "select concat(that.username,'@',that.host ) as blocker, " \
"this.username as blockee, " \
"to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \
"from blocking b " \
"join \"user\" this on this.id = b.\"blockeeId\" " \
"join \"user\" that on that.id = b.\"blockerId\" " \
"where this.\"username\" like \'{0}\' " \
"and this.host is null;".format(self.mk_user)
print("Execute SQL statement...")
cursor.execute(sql)
print("Fetch all!")
rows = cursor.fetchall()
print("Closing cursor...")
cursor.close()
print("Closing DB connection...")
conn.close()
except Exception as e:
print("Error: ", e)
return rows
def get_yesterdays_blocks(self):
print("Get today's blocks")
rows = None
try:
print("Get DBconnection and cursor...")
conn, cursor = self.get_db_cursor()
sql = "select concat(that.username,'@',that.host ) as blocker, " \
"this.username as blockee, " \
"to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \
"from blocking b " \
"join \"user\" this on this.id = b.\"blockeeId\" " \
"join \"user\" that on that.id = b.\"blockerId\" " \
"where this.\"username\" like \'{0}\' " \
"and b.\"createdAt\" between current_date - interval '1 day' and current_date " \
"and this.host is null;".format(self.mk_user)
print("Execute SQL statement...")
cursor.execute(sql)
print("Fetch all!")
rows = cursor.fetchall()
print("Closing cursor...")
cursor.close()
print("Closing DB connection...")
conn.close()
except Exception as e:
print("Error: ", e)
return rows
def get_this_weeks_blocks(self):
print("Get this week's blocks")
rows = None
try:
print("Get DBconnection and cursor...")
conn, cursor = self.get_db_cursor()
sql = "select concat(that.username,'@',that.host ) as blocker, " \
"this.username as blockee, " \
"to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \
"from blocking b " \
"join \"user\" this on this.id = b.\"blockeeId\" " \
"join \"user\" that on that.id = b.\"blockerId\" " \
"where this.\"username\" like \'{0}\' " \
"and b.\"createdAt\" between current_date - interval '7 day' and current_date " \
"and this.host is null;".format(self.mk_user)
print("Execute SQL statement...")
cursor.execute(sql)
print("Fetch all!")
rows = cursor.fetchall()
print("Closing cursor...")
cursor.close()
print("Closing DB connection...")
conn.close()
except Exception as e:
print("Error: ", e)
return rows
def get_this_months_blocks(self):
print("Get this month's blocks")
rows = None
try:
print("Get DBconnection and cursor...")
conn, cursor = self.get_db_cursor()
sql = "select concat(that.username,'@',that.host ) as blocker, " \
"this.username as blockee, " \
"to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \
"from blocking b " \
"join \"user\" this on this.id = b.\"blockeeId\" " \
"join \"user\" that on that.id = b.\"blockerId\" " \
"where this.\"username\" like \'{0}\' " \
"and b.\"createdAt\" between current_date - interval '1 month' and current_date " \
"and this.host is null;".format(self.mk_user)
print("Execute SQL statement...")
cursor.execute(sql)
print("Fetch all!")
rows = cursor.fetchall()
print("Closing cursor...")
cursor.close()
print("Closing DB connection...")
conn.close()
except Exception as e:
print("Error: ", e)
return rows
def get_latest_block(self):
print("Get latest block")
rows = None
try:
print("Get DBconnection and cursor...")
conn, cursor = self.get_db_cursor()
sql = "select concat(that.username,'@',that.host ) as blocker, " \
"this.username as blockee, " \
"to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \
"from blocking b " \
"join \"user\" this on this.id = b.\"blockeeId\" " \
"join \"user\" that on that.id = b.\"blockerId\" " \
"where this.\"username\" like \'{0}\' " \
"and this.host is null " \
"order by b.\"createdAt\" desc " \
"limit 1;".format(self.mk_user)
print("Execute SQL statement...")
cursor.execute(sql)
print("Fetch all!")
rows = cursor.fetchall()
print("Closing cursor...")
cursor.close()
print("Closing DB connection...")
conn.close()
except Exception as e:
print("Error: ", e)
return rows
# Reports
def parse_block_list(self, result, bullet=""):
block_line = ""
if len(result) == 0:
block_line += self.msg_no_block_text + "\n"
else:
for row in result:
blocker = row["blocker"]
block_date = row["block_date"]
block_line += "%s@%s (%s)\n" % (bullet, blocker, block_date)
return block_line
def block_reports(self, report_type):
all_time_blocks = self.get_all_blocks()
latest_block = self.get_latest_block()
match report_type:
case "DAILY":
result = self.get_yesterdays_blocks()
report_header = "\nYesterday's blocks: %d\n" % len(result)
case "WEEKLY":
result = self.get_this_weeks_blocks()
report_header = "\nThis week's blocks: %d\n" % len(result)
case "MONTHLY":
result = self.get_this_months_blocks()
report_header = "\nThis month's blocks: %d\n" % len(result)
case "ALL_TIME":
result = all_time_blocks
report_header = "\nAll time blocks: %d\n" % len(result)
case "STATISTICS":
result = None
report_header = ""
case _:
result = None
report_header = ""
content = self.parse_block_list(result, "* ")
msg_latest = "\nYou were last blocked by %s\n" % self.parse_block_list(latest_block)
msg_all_time = "You have been blocked by %d people so far! :kekw:\n" % len(all_time_blocks)
msg_content = report_header + content + msg_latest + msg_all_time
self.post_note(msg_content)
# Misskey Post
def post_note(self, content=""):
# Do all the things to create the message
message = self.msg_text % self.mk_user
note_content = "%s\n%s" % (message, content)
# The request paylod
payload = {
"visibility": self.msg_visibility,
"text": note_content,
"localOnly": self.msg_local_only,
"i": self.mk_token
}
# Send the request
create_note_request = requests.post(self.mk_url + "notes/create", json=payload)
# Some validation here, gotta add a logfile later smh
if create_note_request.status_code != 200:
print("Error: " + create_note_request.json()["error"]["message"], file=None)