import psycopg2 from sshtunnel import SSHTunnelForwarder class DbQuery: ssh_address = None ssh_key = None ssh_user = None ssh_password = None def __init__(self): print("OHA") def connect_db(): try: with SSHTunnelForwarder( ('ssh-address', 22), ssh_private_key="/path/to/private/key", ### in my case, I used a password instead of a private key ssh_username="ssh-user", ssh_password="ssh-pasword", remote_bind_address=('localhost', 5432)) as server: server.start() print("server connected") params = { 'database': 'database-name', 'user': 'database-user', 'password': 'database-password', 'host': 'localhost', 'port': server.local_bind_port } conn = psycopg2.connect(**params) curs = conn.cursor() print("database connected") sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.username = '' " \ "and this.host is null;" print(sql) curs.execute(sql) rows = curs.fetchall() print(rows) curs.close() conn.close() except Exception as e: print("Connection Failed: ", e) def another_db_conn(user): print("OTHER!") try: print("TRY!") server = SSHTunnelForwarder( ('ssh-address', 22), ssh_private_key="/path/to/private/key", ssh_username="ssh-user", ssh_password="ssh-pasword", remote_bind_address=('localhost', 5432) ) print("start server...") server.start() print("other shit...") params = { 'database': 'database-name', 'user': 'database-user', 'password': 'database-password', 'host': 'localhost', 'port': server.local_bind_port } print("start connection...") conn = psycopg2.connect(**params) print("open cursor...") curs = conn.cursor() print("database connected...") sql = "select concat(that.username,'@',that.host ) as blocker, " \ "this.username as blockee, " \ "to_date(to_char(b.\"createdAt\", 'YYYY/MM/DD'), 'YYYY/MM/DD') as block_date " \ "from blocking b " \ "join \"user\" this on this.id = b.\"blockeeId\" " \ "join \"user\" that on that.id = b.\"blockerId\" " \ "where this.username = '{0}' " \ "and this.host is null;".format(user) print("executing sql statement...") curs.execute(sql) print("fetching results...") rows = curs.fetchall() print(rows) curs.close() conn.close() server.stop() except Exception as e: print("AAAA", e) def init_db_conn(): print("init db conn!") def start_ssh_tunnel(): print("start ssh tunnel!") def load_user_config(): print("user config!") def main(): print("Hello world!") another_db_conn("captain_arepa") # Run main function if __name__ == "__main__": main()