@ -12,13 +12,14 @@ from time import sleep
def check_str_to_bool ( text ) - > bool :
if ( text == " True " or text == " true " or text == " TRUE " ) :
if text == " True " or text == " true " or text == " TRUE " :
return True
elif ( text == " False " or text == " false " or text == " FALSE " ) :
elif text == " False " or text == " false " or text == " FALSE " :
return False
else :
return True
def get_user_id ( username , instance ) :
url = " https:// " + instance + " /api/users/show "
try :
@ -29,23 +30,24 @@ def get_user_id(username, instance):
sys . exit ( 1 )
return req . json ( ) [ " id " ]
def get_notes ( * * kwargs ) :
note id = " k "
note _ id = " k "
sinceid = " "
min_notes = 0
notes L ist = [ ]
return L ist = [ ]
notes _l ist = [ ]
return _l ist = [ ]
username = kwargs [ " username " ]
instance = kwargs [ " instance " ]
print ( " Reading notes for @ " + username + " @ " + instance + " . " )
if ( kwargs ) :
if ( " min_notes " in kwargs ) :
if kwargs :
if " min_notes " in kwargs :
# print("min_notes found!")
init = True
min_notes = kwargs [ " min_notes " ]
elif ( " lastnote " in kwargs ) :
elif " lastnote " in kwargs :
# print("Lastnote found!")
init = False
sinceid = kwargs [ " lastnote " ]
@ -68,60 +70,71 @@ def get_notes(**kwargs):
# Read & Sanitize Inputs from Config File
try :
include R eplies = check_str_to_bool ( config . get ( " markov " , " includeReplies " ) )
except ( TypeError , ValueError ) as err :
include R eplies = True
include _r eplies = check_str_to_bool ( config . get ( " markov " , " includeReplies " ) )
except ( TypeError , ValueError ) :
include _r eplies = True
try :
include MyR enotes = check_str_to_bool ( config . get ( " markov " , " includeMyRenotes " ) )
except ( TypeError , ValueError ) as err :
include MyR enotes = False
include _my_r enotes = check_str_to_bool ( config . get ( " markov " , " includeMyRenotes " ) )
except ( TypeError , ValueError ) :
include _my_r enotes = False
try :
exclude N sfw = check_str_to_bool ( config . get ( " markov " , " excludeNsfw " ) )
except ( TypeError , ValueError ) as err :
exclude N sfw = True
exclude _n sfw = check_str_to_bool ( config . get ( " markov " , " excludeNsfw " ) )
except ( TypeError , ValueError ) :
exclude _n sfw = True
run = True
oldnote = " "
while run :
if ( ( init and len ( notes L ist) > = min_notes ) or ( oldnote == note id) ) :
if ( init and len ( notes _l ist) > = min_notes ) or ( oldnote == note _ id) :
break
try :
req = requests . post ( " https:// " + instance + " /api/users/notes " , json = {
if not init : # sinceid should only be used when updating the database so the json object has to be parsed every time
api_json = {
" userId " : userid ,
" includeReplies " : include_replies ,
" limit " : 100 ,
" includeMyRenotes " : include_my_renotes ,
" withFiles " : False ,
" excludeNsfw " : exclude_nsfw ,
" untilId " : note_id ,
" sinceId " : sinceid }
else :
api_json = {
" userId " : userid ,
" includeReplies " : includeReplies ,
" includeReplies " : include _r eplies,
" limit " : 100 ,
" includeMyRenotes " : includeMyRenotes ,
" includeMyRenotes " : include _my_r enotes,
" withFiles " : False ,
" excludeNsfw " : excludeNsfw ,
" untilId " : noteid ,
" sinceId " : sinceid
} )
" excludeNsfw " : exclude_nsfw ,
" untilId " : note_id }
try :
req = requests . post ( " https:// " + instance + " /api/users/notes " , json = api_json )
req . raise_for_status ( )
except requests . exceptions . HTTPError as err :
print ( " Couldn ' t get Posts! " + str ( err ) )
sys . exit ( 1 )
for jsonObj in req . json ( ) :
notes L ist. append ( jsonObj )
if ( len ( notesList ) == 0 ) :
notes _l ist. append ( jsonObj )
if len ( notes_list ) == 0 :
print ( " No new notes to load! " )
return [ ]
oldnote = note id
oldnote = note _ id
note id = notesList [ len ( notesL ist) - 1 ] [ " id " ]
note _id = notes_list [ len ( notes_l ist) - 1 ] [ " id " ]
print ( str ( len ( notes L ist) ) + " Notes read. " )
print ( str ( len ( notes _l ist) ) + " Notes read. " )
print ( " Processing notes... " )
for element in notes L ist:
last T ime = element [ " createdAt " ]
lastTimestamp = int ( datetime . timestamp ( datetime . strptime ( last T ime, ' % Y- % m- %d T % H: % M: % S. %f % z ' ) ) * 1000 )
for element in notes _l ist:
last _t ime = element [ " createdAt " ]
lastTimestamp = int ( datetime . timestamp ( datetime . strptime ( last _t ime, ' % Y- % m- %d T % H: % M: % S. %f % z ' ) ) * 1000 )
content = element [ " text " ]
@ -133,29 +146,29 @@ def get_notes(**kwargs):
content = content . replace ( " :: " , " : : " ) # Break long emoji chains
content = content . replace ( " @ " , " @ " + chr ( 8203 ) )
dict = { " id " : element [ " id " ] , " text " : content , " timestamp " : lastTimestamp , " user_id " : userid }
return List. append ( dict)
note_ dict = { " id " : element [ " id " ] , " text " : content , " timestamp " : lastTimestamp , " user_id " : userid }
return _list. append ( note_ dict)
return return L ist
return return _l ist
def calculate_markov_chain ( ) :
text = " "
# Load configuration
config = configparser . ConfigParser ( )
config . read ( ( Path ( __file__ ) . parent ) . joinpath ( ' bot.cfg ' ) )
config . read ( Path ( __file__ ) . parent . joinpath ( ' bot.cfg ' ) )
try :
max_notes = config . get ( " markov " , " max_notes " )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
max_notes = " 10000 "
databasepath = ( Path ( __file__ ) . parent ) . joinpath ( ' roboduck.db ' )
if ( not ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) ) :
databasepath = Path ( __file__ ) . parent . joinpath ( ' roboduck.db ' )
if not ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) :
print ( " Roboduck database not already created! " )
print ( " Exit initialization! " )
sys . exit ( 0 )
with open ( databasepath , ' r ' , encoding = ' utf-8 ' ) as emojilist :
with open ( databasepath , ' r ' , encoding = ' utf-8 ' ) :
database = sqlite3 . connect ( databasepath )
data = database . cursor ( )
@ -171,27 +184,27 @@ def calculate_markov_chain():
markov_json = markovchain . to_json ( )
with open ( ( Path ( __file__ ) . parent ) . joinpath ( ' markov.json ' ) , " w " , encoding = " utf-8 " ) as markov :
with open ( Path ( __file__ ) . parent . joinpath ( ' markov.json ' ) , " w " , encoding = " utf-8 " ) as markov :
json . dump ( markov_json , markov )
def clean_database ( ) :
databasepath = ( Path ( __file__ ) . parent ) . joinpath ( ' roboduck.db ' )
if ( not ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) ) :
databasepath = Path ( __file__ ) . parent . joinpath ( ' roboduck.db ' )
if not ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) :
print ( " No database found! " )
print ( " Please run Bot first! " )
sys . exit ( 0 )
with open ( databasepath , " a " , encoding = " utf-8 " ) as f :
with open ( databasepath , " a " , encoding = " utf-8 " ) :
database = sqlite3 . connect ( databasepath )
# Reading config file bot.cfg with config parser
config = configparser . ConfigParser ( )
config . read ( ( Path ( __file__ ) . parent ) . joinpath ( ' bot.cfg ' ) )
config . read ( Path ( __file__ ) . parent . joinpath ( ' bot.cfg ' ) )
# print((Path(__file__).parent).joinpath('bot.cfg'))
try :
max_notes = config . get ( " markov " , " max_notes " )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
max_notes = " 10000 "
for user in config . get ( " misskey " , " users " ) . split ( " ; " ) :
@ -199,64 +212,64 @@ def clean_database():
instance = user . split ( " @ " ) [ 2 ]
userid = get_user_id ( username , instance )
data = database . cursor ( )
data . execute ( " DELETE FROM notes WHERE user_id=:user_id AND id NOT IN (SELECT id FROM notes WHERE user_id=:user_id ORDER BY timestamp DESC LIMIT :max ); " , { " user_id " : userid , " max " : int ( max_notes ) } )
data . execute (
" DELETE FROM notes WHERE user_id=:user_id AND id NOT IN (SELECT id FROM notes WHERE user_id=:user_id ORDER BY timestamp DESC LIMIT :max ); " ,
{ " user_id " : userid , " max " : int ( max_notes ) } )
database . commit ( )
database . close ( )
def create_sentence ( ) :
with open ( ( os . path . join ( ( Path ( __file__ ) . parent ) , ' markov.json ' ) ) , " r " , encoding = " utf-8 " ) as markov :
with open ( ( os . path . join ( Path ( __file__ ) . parent , ' markov.json ' ) ) , " r " , encoding = " utf-8 " ) as markov :
markov_json = json . load ( markov )
text_model = markovify . Text . from_json ( markov_json )
note = " "
# Reading config file bot.cfg with config parser
config = configparser . ConfigParser ( )
config . read ( ( Path ( __file__ ) . parent ) . joinpath ( ' bot.cfg ' ) )
config . read ( Path ( __file__ ) . parent . joinpath ( ' bot.cfg ' ) )
# print((Path(__file__).parent).joinpath('bot.cfg'))
# Read & Sanitize Inputs
try :
test_output = check_str_to_bool ( config . get ( " markov " , " test_output " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print("test_output: " + str(err))
test_output = True
if ( test_output ) :
if test_output :
try :
tries = int ( config . get ( " markov " , " tries " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print("tries: " + str(err))
tries = 250
try :
max_overlap_ratio = float ( config . get ( " markov " , " max_overlap_ratio " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print("max_overlap_ratio: " + str(err))
max_overlap_ratio = 0.7
try :
max_overlap_total = int ( config . get ( " markov " , " max_overlap_total " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print("max_overlap_total: " + str(err))
max_overlap_total = 10
try :
max_words = int ( config . get ( " markov " , " max_words " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print("max_words: " + str(err))
max_words = None
try :
min_words = int ( config . get ( " markov " , " min_words " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print("min_words: " + str(err))
min_words = None
if ( max_words is not None and min_words is not None ) :
if ( min_words > = max_words ) :
if max_words is not None and min_words is not None :
if min_words > = max_words :
# print("min_words ("+str(min_words)+") bigger than max_words ("+str(max_words)+")! Swapping values!")
swap = min_words
min_words = max_words
@ -289,7 +302,7 @@ def create_sentence():
max_words = max_words ,
min_words = min_words
)
if ( note is not None ) :
if note is not None :
return note
else :
return " Error in markov chain sentence creation: Couldn ' t calculate sentence! \n \n ☹ Please try again! "
@ -297,35 +310,38 @@ def create_sentence():
def update ( ) :
notesList = [ ]
databasepath = ( Path ( __file__ ) . parent ) . joinpath ( ' roboduck.db ' )
if ( not ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) ) :
databasepath = Path ( __file__ ) . parent . joinpath ( ' roboduck.db ' )
if not ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) :
print ( " No database found! " )
print ( " Please run Bot first! " )
sys . exit ( 0 )
with open ( databasepath , " a " , encoding = " utf-8 " ) as f :
with open ( databasepath , " a " , encoding = " utf-8 " ) :
database = sqlite3 . connect ( databasepath )
print ( " Connected to roboduck.db succesfull... " )
config = configparser . ConfigParser ( )
config . read ( ( Path ( __file__ ) . parent ) . joinpath ( ' bot.cfg ' ) )
for user in config . get ( " misskey " , " users " ) . split ( " ; " ) :
config . read ( Path ( __file__ ) . parent . joinpath ( ' bot.cfg ' ) )
for user in config . get ( " misskey " , " users " ) . split ( " ; " ) :
username = user . split ( " @ " ) [ 1 ]
instance = user . split ( " @ " ) [ 2 ]
userid = get_user_id ( username , instance )
data = database . cursor ( )
data . execute ( " SELECT id FROM notes WHERE timestamp = (SELECT MAX(timestamp) FROM notes WHERE user_id=:user_id) AND user_id=:user_id; " , { " user_id " : userid } )
data . execute (
" SELECT id FROM notes WHERE timestamp = (SELECT MAX(timestamp) FROM notes WHERE user_id=:user_id) AND user_id=:user_id; " ,
{ " user_id " : userid } )
sinceNote = data . fetchone ( ) [ 0 ]
notesList . extend ( get_notes ( lastnote = sinceNote , username = username , instance = instance ) )
if ( notesList == 0 ) :
if notesList == 0 :
database . close ( )
return
print ( " Insert new notes to database... " )
database . executemany ( " INSERT OR IGNORE INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?) " , [ ( note [ " id " ] , note [ " text " ] , note [ " timestamp " ] , note [ " user_id " ] ) for note in notesList ] )
database . executemany ( " INSERT OR IGNORE INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?) " ,
[ ( note [ " id " ] , note [ " text " ] , note [ " timestamp " ] , note [ " user_id " ] ) for note in notesList ] )
database . commit ( )
print ( " Notes updated! " )
@ -346,16 +362,15 @@ def update():
def init_bot ( ) :
notesList = [ ]
databasepath = ( Path ( __file__ ) . parent ) . joinpath ( ' roboduck.db ' )
if ( os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 ) :
databasepath = Path ( __file__ ) . parent . joinpath ( ' roboduck.db ' )
if os . path . exists ( databasepath ) and os . stat ( databasepath ) . st_size != 0 :
print ( " Roboduck database already created! " )
print ( " Exit initialization! " )
sys . exit ( 0 )
print ( " Creating database... " )
with open ( databasepath , " w+ " , encoding = " utf-8 " ) as f :
with open ( databasepath , " w+ " , encoding = " utf-8 " ) :
database = sqlite3 . connect ( databasepath )
print ( " Connected to roboduck.db succesfull... " )
@ -366,22 +381,22 @@ def init_bot():
# Load configuration
config = configparser . ConfigParser ( )
config . read ( ( Path ( __file__ ) . parent ) . joinpath ( ' bot.cfg ' ) )
config . read ( Path ( __file__ ) . parent . joinpath ( ' bot.cfg ' ) )
try :
initnotes = int ( config . get ( " markov " , " min_notes " ) )
except ( TypeError , ValueError ) as err :
except ( TypeError , ValueError ) :
# print(err)
initnotes = 1000
for user in config . get ( " misskey " , " users " ) . split ( " ; " ) :
for user in config . get ( " misskey " , " users " ) . split ( " ; " ) :
print ( " Try reading first " + str ( initnotes ) + " notes for " + user + " . " )
notesList = get_notes ( min_notes = initnotes , username = user . split ( " @ " ) [ 1 ] , instance = user . split ( " @ " ) [ 2 ] )
print ( " Writing notes into database... " )
database . executemany ( " INSERT INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?) " , [ ( note [ " id " ] , note [ " text " ] , note [ " timestamp " ] , note [ " user_id " ] ) for note in notesList ] )
database . executemany ( " INSERT INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?) " ,
[ ( note [ " id " ] , note [ " text " ] , note [ " timestamp " ] , note [ " user_id " ] ) for note in notesList ] )
database . commit ( )
database . close ( )