fedi-block-api/api.py

160 lines
6.9 KiB
Python
Raw Permalink Normal View History

2022-04-22 18:56:58 +00:00
import uvicorn
from fastapi import FastAPI, Request, HTTPException, responses, Query
2022-03-31 21:09:13 +00:00
import sqlite3
2022-04-22 12:58:23 +00:00
from hashlib import sha256
2022-04-22 17:49:23 +00:00
from fastapi.templating import Jinja2Templates
from requests import get
2022-04-22 18:56:58 +00:00
from json import loads
2022-11-29 19:36:46 +00:00
from re import sub
2022-11-29 23:36:35 +00:00
from datetime import datetime
2022-03-31 21:09:13 +00:00
2022-04-22 18:56:58 +00:00
with open("config.json") as f:
config = loads(f.read())
base_url = config["base_url"]
port = config["port"]
2022-03-31 21:09:13 +00:00
app = FastAPI(docs_url=base_url+"/docs", redoc_url=base_url+"/redoc")
2022-04-22 17:49:23 +00:00
templates = Jinja2Templates(directory=".")
2022-03-31 21:09:13 +00:00
2022-04-22 12:58:23 +00:00
def get_hash(domain: str) -> str:
return sha256(domain.encode("utf-8")).hexdigest()
2022-03-31 21:09:13 +00:00
@app.get(base_url+"/info")
def info():
conn = sqlite3.connect("blocks.db")
c = conn.cursor()
c.execute("select (select count(domain) from instances), (select count(domain) from instances where software in ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (select count(blocker) from blocks)")
2022-03-31 21:09:13 +00:00
known, indexed, blocks = c.fetchone()
c.close()
return {
"known_instances": known,
"indexed_instances": indexed,
2022-11-29 19:54:13 +00:00
"blocks_recorded": blocks
2022-03-31 21:09:13 +00:00
}
2022-11-29 23:36:35 +00:00
@app.get(base_url+"/top")
def top(blocked: int = None, blockers: int = None):
conn = sqlite3.connect("blocks.db")
c = conn.cursor()
if blocked == None and blockers == None:
raise HTTPException(status_code=400, detail="No filter specified")
elif blocked != None:
if blocked > 500:
raise HTTPException(status_code=400, detail="Too many results")
c.execute("select blocked, count(blocked) from blocks where block_level = 'reject' group by blocked order by count(blocked) desc limit ?", (blocked,))
elif blockers != None:
if blockers > 500:
raise HTTPException(status_code=400, detail="Too many results")
c.execute("select blocker, count(blocker) from blocks where block_level = 'reject' group by blocker order by count(blocker) desc limit ?", (blockers,))
scores = c.fetchall()
c.close()
scoreboard = []
print(scores)
for domain, highscore in scores:
scoreboard.append({"domain": domain, "highscore": highscore})
return scoreboard
2022-04-22 17:49:23 +00:00
@app.get(base_url+"/api")
2022-11-29 23:36:35 +00:00
def blocked(domain: str = None, reason: str = None, reverse: str = None):
if domain == None and reason == None and reverse == None:
2022-04-23 13:06:31 +00:00
raise HTTPException(status_code=400, detail="No filter specified")
2022-11-29 19:36:46 +00:00
if reason != None:
reason = sub("(%|_)", "", reason)
if len(reason) < 3:
raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
2022-11-27 20:04:45 +00:00
conn = sqlite3.connect("blocks.db")
2022-03-31 21:09:13 +00:00
c = conn.cursor()
2022-04-23 13:06:31 +00:00
if domain != None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
2022-08-08 11:11:01 +00:00
punycode = domain.encode('idna').decode('utf-8')
2022-11-29 23:36:35 +00:00
c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? order by first_added asc",
2022-08-08 11:11:01 +00:00
(domain, "*." + domain, wildchar, get_hash(domain), punycode, "*." + punycode))
2022-11-29 23:36:35 +00:00
elif reverse != None:
c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where blocker = ? order by first_added asc", (reverse,))
2022-04-23 13:06:31 +00:00
else:
2022-11-29 23:36:35 +00:00
c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where reason like ? and reason != '' order by first_added asc", ("%"+reason+"%",))
2022-03-31 21:09:13 +00:00
blocks = c.fetchall()
2022-11-29 23:36:35 +00:00
c.close()
2022-03-31 21:09:13 +00:00
result = {}
2022-11-29 23:36:35 +00:00
for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
entry = {"blocker": blocker, "blocked": blocked, "reason": reason, "first_added": first_added, "last_seen": last_seen}
if block_level in result:
2022-11-29 23:36:35 +00:00
result[block_level].append(entry)
else:
2022-11-29 23:36:35 +00:00
result[block_level] = [entry]
return result
@app.get(base_url+"/scoreboard")
def index(request: Request, blockers: int = None, blocked: int = None):
if blockers == None and blocked == None:
raise HTTPException(status_code=400, detail="No filter specified")
elif blockers != None:
scores = get(f"http://127.0.0.1:{port}{base_url}/top?blockers={blockers}")
elif blocked != None:
scores = get(f"http://127.0.0.1:{port}{base_url}/top?blocked={blocked}")
if scores != None:
if not scores.ok:
raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
scores = scores.json()
return templates.TemplateResponse("index.html", {"request": request, "scoreboard": True, "blockers": blockers, "blocked": blocked, "scores": scores})
2022-03-31 21:09:13 +00:00
2022-04-22 17:49:23 +00:00
@app.get(base_url+"/")
2022-11-29 23:36:35 +00:00
def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
if domain == "" or reason == "" or reverse == "":
2022-04-22 18:42:39 +00:00
return responses.RedirectResponse("/")
2022-04-22 17:49:23 +00:00
info = None
2022-04-23 13:06:31 +00:00
blocks = None
2022-11-29 23:36:35 +00:00
if domain == None and reason == None and reverse == None:
2022-04-22 18:56:58 +00:00
info = get(f"http://127.0.0.1:{port}{base_url}/info")
2022-04-22 17:49:23 +00:00
if not info.ok:
raise HTTPException(status_code=info.status_code, detail=info.text)
info = info.json()
2022-04-23 13:06:31 +00:00
elif domain != None:
blocks = get(f"http://127.0.0.1:{port}{base_url}/api?domain={domain}")
elif reason != None:
blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reason={reason}")
2022-11-29 23:36:35 +00:00
elif reverse != None:
blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reverse={reverse}")
2022-04-23 13:06:31 +00:00
if blocks != None:
if not blocks.ok:
raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
blocks = blocks.json()
2022-11-29 23:36:35 +00:00
for block_level in blocks:
for block in blocks[block_level]:
block["first_added"] = datetime.utcfromtimestamp(block["first_added"]).strftime('%Y-%m-%d %H:%M')
block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
return templates.TemplateResponse("index.html", {"request": request, "domain": domain, "blocks": blocks, "reason": reason, "reverse": reverse, "info": info})
2022-04-22 18:56:58 +00:00
@app.get(base_url+"/api/mutual")
def mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
conn = sqlite3.connect('blocks.db')
c = conn.cursor()
c.execute(
"SELECT block_level FROM blocks " \
2023-02-26 22:08:33 +00:00
"WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
"AND block_level = 'reject' " \
"LIMIT 1",
{
"a": domains[0],
"b": domains[1],
2023-02-26 22:08:33 +00:00
"aw": "*." + domains[0],
"bw": "*." + domains[1],
},
)
res = c.fetchone()
c.close()
if res is not None:
# Blocks found
return responses.JSONResponse(status_code=418, content={})
# No known blocks
return responses.JSONResponse(status_code=200, content={})
2022-04-22 18:56:58 +00:00
if __name__ == "__main__":
uvicorn.run("api:app", host="127.0.0.1", port=port, log_level="info")