diff --git a/Makefile b/Makefile index 3570366..576f592 100644 --- a/Makefile +++ b/Makefile @@ -27,11 +27,6 @@ reload-dev: docker build . -t microblogpub:latest docker-compose -f docker-compose-dev.yml up -d --force-recreate -# Build the poussetaches Docker image -.PHONY: poussetaches -poussetaches: - git clone https://github.com/tsileo/poussetaches.git pt && cd pt && docker build . -t poussetaches:latest && cd - && rm -rf pt - # Build the microblogpub Docker image .PHONY: microblogpub microblogpub: @@ -42,10 +37,11 @@ microblogpub: # Run the docker-compose project locally (will perform a update if the project is already running) .PHONY: run -run: poussetaches microblogpub +run: microblogpub # (poussetaches and microblogpub Docker image will updated) # Update MongoDB docker pull mongo + docker pull poussetaches/poussetaches # Restart the project docker-compose stop docker-compose up -d --force-recreate --build diff --git a/api.py b/api.py deleted file mode 100644 index bcd39dd..0000000 --- a/api.py +++ /dev/null @@ -1,132 +0,0 @@ -from functools import wraps - -import flask -from flask import abort -from flask import current_app as app -from flask import redirect -from flask import request -from flask import session -from itsdangerous import BadSignature -from little_boxes import activitypub as ap -from little_boxes.errors import NotFromOutboxError - -from app_utils import MY_PERSON -from app_utils import csrf -from app_utils import post_to_outbox -from config import ID -from config import JWT -from utils import now - -api = flask.Blueprint("api", __name__) - - -def _api_required() -> None: - if session.get("logged_in"): - if request.method not in ["GET", "HEAD"]: - # If a standard API request is made with a "login session", it must havw a CSRF token - csrf.protect() - return - - # Token verification - token = request.headers.get("Authorization", "").replace("Bearer ", "") - if not token: - # IndieAuth token - token = request.form.get("access_token", "") - - # Will raise a BadSignature on bad auth - payload = JWT.loads(token) - app.logger.info(f"api call by {payload}") - - -def api_required(f): - @wraps(f) - def decorated_function(*args, **kwargs): - try: - _api_required() - except BadSignature: - abort(401) - - return f(*args, **kwargs) - - return decorated_function - - -def _user_api_arg(key: str, **kwargs): - """Try to get the given key from the requests, try JSON body, form data and query arg.""" - if request.is_json: - oid = request.json.get(key) - else: - oid = request.args.get(key) or request.form.get(key) - - if not oid: - if "default" in kwargs: - app.logger.info(f'{key}={kwargs.get("default")}') - return kwargs.get("default") - - raise ValueError(f"missing {key}") - - app.logger.info(f"{key}={oid}") - return oid - - -def _user_api_get_note(from_outbox: bool = False): - oid = _user_api_arg("id") - app.logger.info(f"fetching {oid}") - note = ap.parse_activity(ap.get_backend().fetch_iri(oid)) - if from_outbox and not note.id.startswith(ID): - raise NotFromOutboxError( - f"cannot load {note.id}, id must be owned by the server" - ) - - return note - - -def _user_api_response(**kwargs): - _redirect = _user_api_arg("redirect", default=None) - if _redirect: - return redirect(_redirect) - - resp = flask.jsonify(**kwargs) - resp.status_code = 201 - return resp - - -@api.route("/note/delete", methods=["POST"]) -@api_required -def api_delete(): - """API endpoint to delete a Note activity.""" - note = _user_api_get_note(from_outbox=True) - - # Create the delete, same audience as the Create object - delete = ap.Delete( - actor=ID, - object=ap.Tombstone(id=note.id).to_dict(embed=True), - to=note.to, - cc=note.cc, - published=now(), - ) - - delete_id = post_to_outbox(delete) - - return _user_api_response(activity=delete_id) - - -@api.route("/boost", methods=["POST"]) -@api_required -def api_boost(): - note = _user_api_get_note() - - # Ensures the note visibility allow us to build an Announce (in respect to the post visibility) - if ap.get_visibility(note) not in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: - abort(400) - - announce = ap.Announce( - actor=MY_PERSON.id, - object=note.id, - to=[MY_PERSON.followers, note.attributedTo], - cc=[ap.AS_PUBLIC], - published=now(), - ) - announce_id = post_to_outbox(announce) - - return _user_api_response(activity=announce_id) diff --git a/app.py b/app.py index fbb2982..df26086 100644 --- a/app.py +++ b/app.py @@ -1,27 +1,17 @@ -import binascii import json import logging -import mimetypes import os import traceback from datetime import datetime -from datetime import timedelta -from datetime import timezone -from functools import wraps -from io import BytesIO from typing import Any from typing import Dict -from urllib.parse import urlencode from urllib.parse import urlparse -import mf2py -import requests from bson.objectid import ObjectId from flask import Flask from flask import Response from flask import abort from flask import jsonify as flask_jsonify -from flask import make_response from flask import redirect from flask import render_template from flask import request @@ -31,75 +21,60 @@ from itsdangerous import BadSignature from little_boxes import activitypub as ap from little_boxes.activitypub import ActivityType from little_boxes.activitypub import clean_activity -from little_boxes.activitypub import format_datetime from little_boxes.activitypub import get_backend -from little_boxes.content_helper import parse_markdown from little_boxes.errors import ActivityGoneError -from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import Error -from little_boxes.errors import NotAnActivityError -from little_boxes.errors import NotFromOutboxError from little_boxes.httpsig import HTTPSigAuth from little_boxes.httpsig import verify_request from little_boxes.webfinger import get_actor_url from little_boxes.webfinger import get_remote_follow_template -from passlib.hash import bcrypt -from requests.exceptions import HTTPError from u2flib_server import u2f -from werkzeug.utils import secure_filename -import activity_gc import activitypub +import blueprints.admin +import blueprints.tasks +import blueprints.well_known import config from activitypub import Box -from activitypub import _answer_key from activitypub import embed_collection -from api import api from app_utils import MY_PERSON +from app_utils import _add_answers_to_question +from app_utils import _build_thread +from app_utils import _get_ip from app_utils import back from app_utils import csrf +from app_utils import login_required +from app_utils import noindex +from app_utils import paginated_query +from app_utils import post_to_outbox +from blueprints.api import _api_required from config import ADMIN_API_KEY -from config import BASE_URL from config import BLACKLIST from config import DB -from config import DEBUG_MODE -from config import DOMAIN -from config import EMOJIS from config import HEADERS -from config import ICON_URL from config import ID -from config import JWT from config import KEY from config import ME from config import MEDIA_CACHE -from config import PASS -from config import USER_AGENT -from config import USERNAME from config import VERSION -from config import VERSION_DATE from config import MetaKey -from config import _drop_db from config import _meta -from poussetaches import PousseTaches +from indieauth import indieauth from tasks import Tasks from utils import now -from utils import opengraph from utils.key import get_secret_key -from utils.lookup import lookup -from utils.notifications import set_inbox_flags from utils.template_filters import filters -p = PousseTaches( - os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"), - os.getenv("MICROBLOGPUB_INTERNAL_HOST", "http://localhost:5000"), -) - # p = PousseTaches("http://localhost:7991", "http://localhost:5000") app = Flask(__name__) app.secret_key = get_secret_key("flask") app.register_blueprint(filters) -app.register_blueprint(api, url_prefix="/api") +app.register_blueprint(blueprints.admin.blueprint) +app.register_blueprint(blueprints.api.blueprint, url_prefix="/api") +app.register_blueprint(indieauth) +app.register_blueprint(blueprints.tasks.blueprint) +app.register_blueprint(blueprints.well_known.blueprint) app.config.update(WTF_CSRF_CHECK_DEFAULT=False) csrf.init_app(app) @@ -127,10 +102,6 @@ def is_blacklisted(url: str) -> bool: return False -def verify_pass(pwd): - return bcrypt.verify(pwd, PASS) - - @app.context_processor def inject_config(): q = { @@ -195,69 +166,6 @@ def set_x_powered_by(response): return response -def add_response_headers(headers={}): - """This decorator adds the headers passed in to the response""" - - def decorator(f): - @wraps(f) - def decorated_function(*args, **kwargs): - resp = make_response(f(*args, **kwargs)) - h = resp.headers - for header, value in headers.items(): - h[header] = value - return resp - - return decorated_function - - return decorator - - -def noindex(f): - """This decorator passes X-Robots-Tag: noindex, nofollow""" - return add_response_headers({"X-Robots-Tag": "noindex, nofollow"})(f) - - -def login_required(f): - @wraps(f) - def decorated_function(*args, **kwargs): - if not session.get("logged_in"): - return redirect(url_for("admin_login", next=request.url)) - return f(*args, **kwargs) - - return decorated_function - - -def _api_required(): - if session.get("logged_in"): - if request.method not in ["GET", "HEAD"]: - # If a standard API request is made with a "login session", it must havw a CSRF token - csrf.protect() - return - - # Token verification - token = request.headers.get("Authorization", "").replace("Bearer ", "") - if not token: - # IndieAuth token - token = request.form.get("access_token", "") - - # Will raise a BadSignature on bad auth - payload = JWT.loads(token) - logger.info(f"api call by {payload}") - - -def api_required(f): - @wraps(f) - def decorated_function(*args, **kwargs): - try: - _api_required() - except BadSignature: - abort(401) - - return f(*args, **kwargs) - - return decorated_function - - def jsonify(**data): if "@context" not in data: data["@context"] = config.DEFAULT_CTX @@ -271,23 +179,6 @@ def jsonify(**data): ) -def _get_ip(): - """Guess the IP address from the request. Only used for security purpose (failed logins or bad payload). - - Geoip will be returned if the "broxy" headers are set (it does Geoip - using an offline database and append these special headers). - """ - ip = request.headers.get("X-Forwarded-For", request.remote_addr) - geoip = None - if request.headers.get("Broxy-Geoip-Country"): - geoip = ( - request.headers.get("Broxy-Geoip-Country") - + "/" - + request.headers.get("Broxy-Geoip-Region") - ) - return ip, geoip - - def is_api_request(): h = request.headers.get("Accept") if h is None: @@ -393,75 +284,6 @@ def serve_uploads(oid, fname): # Login -@app.route("/admin/update_actor") -@login_required -def admin_update_actor(): - update = ap.Update( - actor=MY_PERSON.id, - object=MY_PERSON.to_dict(), - to=[MY_PERSON.followers], - cc=[ap.AS_PUBLIC], - published=now(), - ) - - post_to_outbox(update) - return "OK" - - -@app.route("/admin/logout") -@login_required -def admin_logout(): - session["logged_in"] = False - return redirect("/") - - -@app.route("/login", methods=["POST", "GET"]) -@noindex -def admin_login(): - if session.get("logged_in") is True: - return redirect(url_for("admin_notifications")) - - devices = [doc["device"] for doc in DB.u2f.find()] - u2f_enabled = True if devices else False - if request.method == "POST": - csrf.protect() - # 1. Check regular password login flow - pwd = request.form.get("pass") - if pwd: - if verify_pass(pwd): - session["logged_in"] = True - return redirect( - request.args.get("redirect") or url_for("admin_notifications") - ) - else: - abort(403) - # 2. Check for U2F payload, if any - elif devices: - resp = json.loads(request.form.get("resp")) - try: - u2f.complete_authentication(session["challenge"], resp) - except ValueError as exc: - print("failed", exc) - abort(403) - return - finally: - session["challenge"] = None - - session["logged_in"] = True - return redirect( - request.args.get("redirect") or url_for("admin_notifications") - ) - else: - abort(401) - - payload = None - if devices: - payload = u2f.begin_authentication(ID, devices) - session["challenge"] = payload - - return render_template("login.html", u2f_enabled=u2f_enabled, payload=payload) - - @app.route("/remote_follow", methods=["GET", "POST"]) def remote_follow(): if request.method == "GET": @@ -529,42 +351,6 @@ def drop_cache(): return "Done" -def paginated_query(db, q, limit=25, sort_key="_id"): - older_than = newer_than = None - query_sort = -1 - first_page = not request.args.get("older_than") and not request.args.get( - "newer_than" - ) - - query_older_than = request.args.get("older_than") - query_newer_than = request.args.get("newer_than") - - if query_older_than: - q["_id"] = {"$lt": ObjectId(query_older_than)} - elif query_newer_than: - q["_id"] = {"$gt": ObjectId(query_newer_than)} - query_sort = 1 - - outbox_data = list(db.find(q, limit=limit + 1).sort(sort_key, query_sort)) - outbox_len = len(outbox_data) - outbox_data = sorted( - outbox_data[:limit], key=lambda x: str(x[sort_key]), reverse=True - ) - - if query_older_than: - newer_than = str(outbox_data[0]["_id"]) - if outbox_len == limit + 1: - older_than = str(outbox_data[-1]["_id"]) - elif query_newer_than: - older_than = str(outbox_data[-1]["_id"]) - if outbox_len == limit + 1: - newer_than = str(outbox_data[0]["_id"]) - elif first_page and outbox_len == limit + 1: - older_than = str(outbox_data[-1]["_id"]) - - return outbox_data, older_than, newer_than - - @app.route("/") def index(): if is_api_request(): @@ -628,76 +414,6 @@ def all(): ) -def _build_thread(data, include_children=True): # noqa: C901 - data["_requested"] = True - app.logger.info(f"_build_thread({data!r})") - root_id = data["meta"].get("thread_root_parent", data["activity"]["object"]["id"]) - - query = { - "$or": [{"meta.thread_root_parent": root_id}, {"activity.object.id": root_id}], - "meta.deleted": False, - } - replies = [data] - for dat in DB.activities.find(query): - print(dat["type"]) - if dat["type"][0] == ActivityType.CREATE.value: - replies.append(dat) - if dat["type"][0] == ActivityType.UPDATE.value: - continue - else: - # Make a Note/Question/... looks like a Create - dat = { - "activity": {"object": dat["activity"]}, - "meta": dat["meta"], - "_id": dat["_id"], - } - replies.append(dat) - - replies = sorted(replies, key=lambda d: d["activity"]["object"]["published"]) - - # Index all the IDs in order to build a tree - idx = {} - replies2 = [] - for rep in replies: - rep_id = rep["activity"]["object"]["id"] - if rep_id in idx: - continue - idx[rep_id] = rep.copy() - idx[rep_id]["_nodes"] = [] - replies2.append(rep) - - # Build the tree - for rep in replies2: - rep_id = rep["activity"]["object"]["id"] - if rep_id == root_id: - continue - reply_of = ap._get_id(rep["activity"]["object"].get("inReplyTo")) - try: - idx[reply_of]["_nodes"].append(rep) - except KeyError: - app.logger.info(f"{reply_of} is not there! skipping {rep}") - - # Flatten the tree - thread = [] - - def _flatten(node, level=0): - node["_level"] = level - thread.append(node) - - for snode in sorted( - idx[node["activity"]["object"]["id"]]["_nodes"], - key=lambda d: d["activity"]["object"]["published"], - ): - _flatten(snode, level=level + 1) - - try: - _flatten(idx[root_id]) - except KeyError: - app.logger.info(f"{root_id} is not there! skipping") - - return thread - - @app.route("/note/") def note_by_id(note_id): if is_api_request(): @@ -762,96 +478,6 @@ def note_by_id(note_id): ) -@app.route("/nodeinfo") -def nodeinfo(): - q = { - "box": Box.OUTBOX.value, - "meta.deleted": False, - "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]}, - } - - response = json.dumps( - { - "version": "2.1", - "software": { - "name": "microblogpub", - "version": f"{VERSION}", - "repository": "https://github.com/tsileo/microblog.pub", - }, - "protocols": ["activitypub"], - "services": {"inbound": [], "outbound": []}, - "openRegistrations": False, - "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, - "metadata": { - "sourceCode": "https://github.com/tsileo/microblog.pub", - "nodeName": f"@{USERNAME}@{DOMAIN}", - "version": VERSION, - "version_date": VERSION_DATE, - }, - } - ) - - return Response( - headers={ - "Content-Type": "application/json; profile=http://nodeinfo.diaspora.software/ns/schema/2.1#" - }, - response=response, - ) - - -@app.route("/.well-known/nodeinfo") -def wellknown_nodeinfo(): - return flask_jsonify( - links=[ - { - "rel": "http://nodeinfo.diaspora.software/ns/schema/2.0", - "href": f"{ID}/nodeinfo", - } - ] - ) - - -@app.route("/.well-known/webfinger") -def wellknown_webfinger(): - """Enable WebFinger support, required for Mastodon interopability.""" - # TODO(tsileo): move this to little-boxes? - resource = request.args.get("resource") - if resource not in [f"acct:{USERNAME}@{DOMAIN}", ID]: - abort(404) - - out = { - "subject": f"acct:{USERNAME}@{DOMAIN}", - "aliases": [ID], - "links": [ - { - "rel": "http://webfinger.net/rel/profile-page", - "type": "text/html", - "href": BASE_URL, - }, - {"rel": "self", "type": "application/activity+json", "href": ID}, - { - "rel": "http://ostatus.org/schema/1.0/subscribe", - "template": BASE_URL + "/authorize_follow?profile={uri}", - }, - {"rel": "magic-public-key", "href": KEY.to_magic_key()}, - { - "href": ICON_URL, - "rel": "http://webfinger.net/rel/avatar", - "type": mimetypes.guess_type(ICON_URL)[0], - }, - ], - } - - return Response( - response=json.dumps(out), - headers={ - "Content-Type": "application/jrd+json; charset=utf-8" - if not app.debug - else "application/json" - }, - ) - - def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]: if raw_doc["activity"]["type"] != ActivityType.CREATE.value: return raw_doc @@ -878,25 +504,6 @@ def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]: return activity -def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None: - activity = raw_doc["activity"] - if ( - ap._has_type(activity["type"], ActivityType.CREATE) - and "object" in activity - and ap._has_type(activity["object"]["type"], ActivityType.QUESTION) - ): - for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")): - choice["replies"] = { - "type": ActivityType.COLLECTION.value, - "totalItems": raw_doc["meta"] - .get("question_answers", {}) - .get(_answer_key(choice["name"]), 0), - } - now = datetime.now(timezone.utc) - if format_datetime(now) >= activity["object"]["endTime"]: - activity["object"]["closed"] = activity["object"]["endTime"] - - def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]: raw_doc = add_extra_collection(raw_doc) activity = clean_activity(raw_doc["activity"]) @@ -1104,492 +711,12 @@ def outbox_activity_shares(item_id): ) -@app.route("/admin", methods=["GET"]) -@login_required -def admin(): - q = { - "meta.deleted": False, - "meta.undo": False, - "type": ActivityType.LIKE.value, - "box": Box.OUTBOX.value, - } - col_liked = DB.activities.count(q) - - return render_template( - "admin.html", - instances=list(DB.instances.find()), - inbox_size=DB.activities.count({"box": Box.INBOX.value}), - outbox_size=DB.activities.count({"box": Box.OUTBOX.value}), - col_liked=col_liked, - col_followers=DB.activities.count( - { - "box": Box.INBOX.value, - "type": ActivityType.FOLLOW.value, - "meta.undo": False, - } - ), - col_following=DB.activities.count( - { - "box": Box.OUTBOX.value, - "type": ActivityType.FOLLOW.value, - "meta.undo": False, - } - ), - ) - - -@app.route("/admin/indieauth", methods=["GET"]) -@login_required -def admin_indieauth(): - return render_template( - "admin_indieauth.html", - indieauth_actions=DB.indieauth.find().sort("ts", -1).limit(100), - ) - - -@app.route("/admin/tasks", methods=["GET"]) -@login_required -def admin_tasks(): - return render_template( - "admin_tasks.html", - success=p.get_success(), - dead=p.get_dead(), - waiting=p.get_waiting(), - cron=p.get_cron(), - ) - - -@app.route("/admin/lookup", methods=["GET", "POST"]) -@login_required -def admin_lookup(): - data = None - meta = None - if request.method == "POST": - if request.form.get("url"): - data = lookup(request.form.get("url")) - if data.has_type(ActivityType.ANNOUNCE): - meta = dict( - object=data.get_object().to_dict(), - object_actor=data.get_object().get_actor().to_dict(), - actor=data.get_actor().to_dict(), - ) - - elif data.has_type(ActivityType.QUESTION): - p.push(data.id, "/task/fetch_remote_question") - - print(data) - app.logger.debug(data.to_dict()) - return render_template( - "lookup.html", data=data, meta=meta, url=request.form.get("url") - ) - - -@app.route("/admin/thread") -@login_required -def admin_thread(): - data = DB.activities.find_one( - { - "type": ActivityType.CREATE.value, - "activity.object.id": request.args.get("oid"), - } - ) - - if not data: - abort(404) - if data["meta"].get("deleted", False): - abort(410) - thread = _build_thread(data) - - tpl = "note.html" - if request.args.get("debug"): - tpl = "note_debug.html" - return render_template(tpl, thread=thread, note=data) - - -@app.route("/admin/new", methods=["GET"]) -@login_required -def admin_new(): - reply_id = None - content = "" - thread = [] - print(request.args) - if request.args.get("reply"): - data = DB.activities.find_one({"activity.object.id": request.args.get("reply")}) - if data: - reply = ap.parse_activity(data["activity"]) - else: - data = dict( - meta={}, - activity=dict( - object=get_backend().fetch_iri(request.args.get("reply")) - ), - ) - reply = ap.parse_activity(data["activity"]["object"]) - - reply_id = reply.id - if reply.ACTIVITY_TYPE == ActivityType.CREATE: - reply_id = reply.get_object().id - actor = reply.get_actor() - domain = urlparse(actor.id).netloc - # FIXME(tsileo): if reply of reply, fetch all participants - content = f"@{actor.preferredUsername}@{domain} " - thread = _build_thread(data) - - return render_template( - "new.html", - reply=reply_id, - content=content, - thread=thread, - visibility=ap.Visibility, - emojis=EMOJIS.split(" "), - ) - - -@app.route("/admin/lists", methods=["GET"]) -@login_required -def admin_lists(): - lists = list(DB.lists.find()) - - return render_template("lists.html", lists=lists) - - -@app.route("/admin/notifications") -@login_required -def admin_notifications(): - # Setup the cron for deleting old activities - - # FIXME(tsileo): put back to 12h - p.push({}, "/task/cleanup", schedule="@every 1h") - - # Trigger a cleanup if asked - if request.args.get("cleanup"): - p.push({}, "/task/cleanup") - - # FIXME(tsileo): show unfollow (performed by the current actor) and liked??? - mentions_query = { - "type": ActivityType.CREATE.value, - "activity.object.tag.type": "Mention", - "activity.object.tag.name": f"@{USERNAME}@{DOMAIN}", - "meta.deleted": False, - } - replies_query = { - "type": ActivityType.CREATE.value, - "activity.object.inReplyTo": {"$regex": f"^{BASE_URL}"}, - "meta.poll_answer": False, - } - announced_query = { - "type": ActivityType.ANNOUNCE.value, - "activity.object": {"$regex": f"^{BASE_URL}"}, - } - new_followers_query = {"type": ActivityType.FOLLOW.value} - unfollow_query = { - "type": ActivityType.UNDO.value, - "activity.object.type": ActivityType.FOLLOW.value, - } - likes_query = { - "type": ActivityType.LIKE.value, - "activity.object": {"$regex": f"^{BASE_URL}"}, - } - followed_query = {"type": ActivityType.ACCEPT.value} - q = { - "box": Box.INBOX.value, - "$or": [ - mentions_query, - announced_query, - replies_query, - new_followers_query, - followed_query, - unfollow_query, - likes_query, - ], - } - inbox_data, older_than, newer_than = paginated_query(DB.activities, q) - if not newer_than: - nstart = datetime.now(timezone.utc).isoformat() - else: - nstart = inbox_data[0]["_id"].generation_time.isoformat() - if not older_than: - nend = (datetime.now(timezone.utc) - timedelta(days=15)).isoformat() - else: - nend = inbox_data[-1]["_id"].generation_time.isoformat() - print(nstart, nend) - notifs = list( - DB.notifications.find({"datetime": {"$lte": nstart, "$gt": nend}}) - .sort("_id", -1) - .limit(50) - ) - print(inbox_data) - - nid = None - if inbox_data: - nid = inbox_data[0]["_id"] - - inbox_data.extend(notifs) - inbox_data = sorted( - inbox_data, reverse=True, key=lambda doc: doc["_id"].generation_time - ) - - return render_template( - "stream.html", - inbox_data=inbox_data, - older_than=older_than, - newer_than=newer_than, - nid=nid, - ) - - @app.route("/api/key") @login_required def api_user_key(): return flask_jsonify(api_key=ADMIN_API_KEY) -def _user_api_arg(key: str, **kwargs): - """Try to get the given key from the requests, try JSON body, form data and query arg.""" - if request.is_json: - oid = request.json.get(key) - else: - oid = request.args.get(key) or request.form.get(key) - - if not oid: - if "default" in kwargs: - app.logger.info(f'{key}={kwargs.get("default")}') - return kwargs.get("default") - - raise ValueError(f"missing {key}") - - app.logger.info(f"{key}={oid}") - return oid - - -def _user_api_get_note(from_outbox: bool = False): - oid = _user_api_arg("id") - app.logger.info(f"fetching {oid}") - note = ap.parse_activity(get_backend().fetch_iri(oid)) - if from_outbox and not note.id.startswith(ID): - raise NotFromOutboxError( - f"cannot load {note.id}, id must be owned by the server" - ) - - return note - - -def _user_api_response(**kwargs): - _redirect = _user_api_arg("redirect", default=None) - if _redirect: - return redirect(_redirect) - - resp = flask_jsonify(**kwargs) - resp.status_code = 201 - return resp - - -@app.route("/api/mark_notifications_as_read", methods=["POST"]) -@api_required -def api_mark_notification_as_read(): - nid = ObjectId(_user_api_arg("nid")) - - DB.activities.update_many( - {_meta(MetaKey.NOTIFICATION_UNREAD): True, "_id": {"$lte": nid}}, - {"$set": {_meta(MetaKey.NOTIFICATION_UNREAD): False}}, - ) - - return _user_api_response() - - -@app.route("/api/vote", methods=["POST"]) -@api_required -def api_vote(): - oid = _user_api_arg("id") - app.logger.info(f"fetching {oid}") - note = ap.parse_activity(get_backend().fetch_iri(oid)) - choice = _user_api_arg("choice") - - raw_note = dict( - attributedTo=MY_PERSON.id, - cc=[], - to=note.get_actor().id, - name=choice, - tag=[], - inReplyTo=note.id, - ) - raw_note["@context"] = config.DEFAULT_CTX - - note = ap.Note(**raw_note) - create = note.build_create() - create_id = post_to_outbox(create) - - return _user_api_response(activity=create_id) - - -@app.route("/api/like", methods=["POST"]) -@api_required -def api_like(): - note = _user_api_get_note() - - to = [] - cc = [] - - note_visibility = ap.get_visibility(note) - - if note_visibility == ap.Visibility.PUBLIC: - to = [ap.AS_PUBLIC] - cc = [ID + "/followers", note.get_actor().id] - elif note_visibility == ap.Visibility.UNLISTED: - to = [ID + "/followers", note.get_actor().id] - cc = [ap.AS_PUBLIC] - else: - to = [note.get_actor().id] - - like = ap.Like(object=note.id, actor=MY_PERSON.id, to=to, cc=cc, published=now()) - - like_id = post_to_outbox(like) - - return _user_api_response(activity=like_id) - - -@app.route("/api/bookmark", methods=["POST"]) -@api_required -def api_bookmark(): - note = _user_api_get_note() - - undo = _user_api_arg("undo", default=None) == "yes" - - # Try to bookmark the `Create` first - if not DB.activities.update_one( - {"activity.object.id": note.id}, {"$set": {"meta.bookmarked": not undo}} - ).modified_count: - # Then look for the `Announce` - DB.activities.update_one( - {"meta.object.id": note.id}, {"$set": {"meta.bookmarked": not undo}} - ) - - return _user_api_response() - - -@app.route("/api/note/pin", methods=["POST"]) -@api_required -def api_pin(): - note = _user_api_get_note(from_outbox=True) - - DB.activities.update_one( - {"activity.object.id": note.id, "box": Box.OUTBOX.value}, - {"$set": {"meta.pinned": True}}, - ) - - return _user_api_response(pinned=True) - - -@app.route("/api/note/unpin", methods=["POST"]) -@api_required -def api_unpin(): - note = _user_api_get_note(from_outbox=True) - - DB.activities.update_one( - {"activity.object.id": note.id, "box": Box.OUTBOX.value}, - {"$set": {"meta.pinned": False}}, - ) - - return _user_api_response(pinned=False) - - -@app.route("/api/undo", methods=["POST"]) -@api_required -def api_undo(): - oid = _user_api_arg("id") - doc = DB.activities.find_one( - { - "box": Box.OUTBOX.value, - "$or": [{"remote_id": back.activity_url(oid)}, {"remote_id": oid}], - } - ) - if not doc: - raise ActivityNotFoundError(f"cannot found {oid}") - - obj = ap.parse_activity(doc.get("activity")) - - undo = ap.Undo( - actor=MY_PERSON.id, - object=obj.to_dict(embed=True, embed_object_id_only=True), - published=now(), - to=obj.to, - cc=obj.cc, - ) - - # FIXME(tsileo): detect already undo-ed and make this API call idempotent - undo_id = post_to_outbox(undo) - - return _user_api_response(activity=undo_id) - - -@app.route("/admin/stream") -@login_required -def admin_stream(): - q = {"meta.stream": True, "meta.deleted": False} - - tpl = "stream.html" - if request.args.get("debug"): - tpl = "stream_debug.html" - if request.args.get("debug_inbox"): - q = {} - - inbox_data, older_than, newer_than = paginated_query( - DB.activities, q, limit=int(request.args.get("limit", 25)) - ) - - return render_template( - tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than - ) - - -@app.route("/admin/list/") -@login_required -def admin_list(name): - list_ = DB.lists.find_one({"name": name}) - if not list_: - abort(404) - - q = { - "meta.stream": True, - "meta.deleted": False, - "meta.actor_id": {"$in": list_["members"]}, - } - - tpl = "stream.html" - if request.args.get("debug"): - tpl = "stream_debug.html" - if request.args.get("debug_inbox"): - q = {} - - inbox_data, older_than, newer_than = paginated_query( - DB.activities, q, limit=int(request.args.get("limit", 25)) - ) - - return render_template( - tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than - ) - - -@app.route("/admin/bookmarks") -@login_required -def admin_bookmarks(): - q = {"meta.bookmarked": True} - - tpl = "stream.html" - if request.args.get("debug"): - tpl = "stream_debug.html" - if request.args.get("debug_inbox"): - q = {} - - inbox_data, older_than, newer_than = paginated_query( - DB.activities, q, limit=int(request.args.get("limit", 25)) - ) - - return render_template( - tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than - ) - - @app.route("/inbox", methods=["GET", "POST"]) # noqa: C901 def inbox(): # GET /inbox @@ -1729,293 +856,6 @@ def inbox(): return Response(status=201) -def without_id(l): - out = [] - for d in l: - if "_id" in d: - del d["_id"] - out.append(d) - return out - - -@app.route("/api/debug", methods=["GET", "DELETE"]) -@api_required -def api_debug(): - """Endpoint used/needed for testing, only works in DEBUG_MODE.""" - if not DEBUG_MODE: - return flask_jsonify(message="DEBUG_MODE is off") - - if request.method == "DELETE": - _drop_db() - return flask_jsonify(message="DB dropped") - - return flask_jsonify( - inbox=DB.activities.count({"box": Box.INBOX.value}), - outbox=DB.activities.count({"box": Box.OUTBOX.value}), - outbox_data=without_id(DB.activities.find({"box": Box.OUTBOX.value})), - ) - - -@app.route("/api/new_list", methods=["POST"]) -@api_required -def api_new_list(): - name = _user_api_arg("name") - if not name: - raise ValueError("missing name") - - if not DB.lists.find_one({"name": name}): - DB.lists.insert_one({"name": name, "members": []}) - - return _user_api_response(name=name) - - -@app.route("/api/delete_list", methods=["POST"]) -@api_required -def api_delete_list(): - name = _user_api_arg("name") - if not name: - raise ValueError("missing name") - - if not DB.lists.find_one({"name": name}): - abort(404) - - DB.lists.delete_one({"name": name}) - - return _user_api_response() - - -@app.route("/api/add_to_list", methods=["POST"]) -@api_required -def api_add_to_list(): - list_name = _user_api_arg("list_name") - if not list_name: - raise ValueError("missing list_name") - - if not DB.lists.find_one({"name": list_name}): - raise ValueError(f"list {list_name} does not exist") - - actor_id = _user_api_arg("actor_id") - if not actor_id: - raise ValueError("missing actor_id") - - DB.lists.update_one({"name": list_name}, {"$addToSet": {"members": actor_id}}) - - return _user_api_response() - - -@app.route("/api/remove_from_list", methods=["POST"]) -@api_required -def api_remove_from_list(): - list_name = _user_api_arg("list_name") - if not list_name: - raise ValueError("missing list_name") - - if not DB.lists.find_one({"name": list_name}): - raise ValueError(f"list {list_name} does not exist") - - actor_id = _user_api_arg("actor_id") - if not actor_id: - raise ValueError("missing actor_id") - - DB.lists.update_one({"name": list_name}, {"$pull": {"members": actor_id}}) - - return _user_api_response() - - -@app.route("/api/new_note", methods=["POST"]) -@api_required -def api_new_note(): - source = _user_api_arg("content") - if not source: - raise ValueError("missing content") - - _reply, reply = None, None - try: - _reply = _user_api_arg("reply") - except ValueError: - pass - - visibility = ap.Visibility[ - _user_api_arg("visibility", default=ap.Visibility.PUBLIC.name) - ] - - content, tags = parse_markdown(source) - - to, cc = [], [] - if visibility == ap.Visibility.PUBLIC: - to = [ap.AS_PUBLIC] - cc = [ID + "/followers"] - elif visibility == ap.Visibility.UNLISTED: - to = [ID + "/followers"] - cc = [ap.AS_PUBLIC] - elif visibility == ap.Visibility.FOLLOWERS_ONLY: - to = [ID + "/followers"] - cc = [] - - if _reply: - reply = ap.fetch_remote_activity(_reply) - if visibility == ap.Visibility.DIRECT: - to.append(reply.attributedTo) - else: - cc.append(reply.attributedTo) - - for tag in tags: - if tag["type"] == "Mention": - if visibility == ap.Visibility.DIRECT: - to.append(tag["href"]) - else: - cc.append(tag["href"]) - - raw_note = dict( - attributedTo=MY_PERSON.id, - cc=list(set(cc)), - to=list(set(to)), - content=content, - tag=tags, - source={"mediaType": "text/markdown", "content": source}, - inReplyTo=reply.id if reply else None, - ) - - if "file" in request.files and request.files["file"].filename: - file = request.files["file"] - rfilename = secure_filename(file.filename) - with BytesIO() as buf: - file.save(buf) - oid = MEDIA_CACHE.save_upload(buf, rfilename) - mtype = mimetypes.guess_type(rfilename)[0] - - raw_note["attachment"] = [ - { - "mediaType": mtype, - "name": rfilename, - "type": "Document", - "url": f"{BASE_URL}/uploads/{oid}/{rfilename}", - } - ] - - note = ap.Note(**raw_note) - create = note.build_create() - create_id = post_to_outbox(create) - - return _user_api_response(activity=create_id) - - -@app.route("/api/new_question", methods=["POST"]) -@api_required -def api_new_question(): - source = _user_api_arg("content") - if not source: - raise ValueError("missing content") - - content, tags = parse_markdown(source) - cc = [ID + "/followers"] - - for tag in tags: - if tag["type"] == "Mention": - cc.append(tag["href"]) - - answers = [] - for i in range(4): - a = _user_api_arg(f"answer{i}", default=None) - if not a: - break - answers.append( - { - "type": ActivityType.NOTE.value, - "name": a, - "replies": {"type": ActivityType.COLLECTION.value, "totalItems": 0}, - } - ) - - open_for = int(_user_api_arg("open_for")) - choices = { - "endTime": ap.format_datetime( - datetime.now(timezone.utc) + timedelta(minutes=open_for) - ) - } - of = _user_api_arg("of") - if of == "anyOf": - choices["anyOf"] = answers - else: - choices["oneOf"] = answers - - raw_question = dict( - attributedTo=MY_PERSON.id, - cc=list(set(cc)), - to=[ap.AS_PUBLIC], - content=content, - tag=tags, - source={"mediaType": "text/markdown", "content": source}, - inReplyTo=None, - **choices, - ) - - question = ap.Question(**raw_question) - create = question.build_create() - create_id = post_to_outbox(create) - - Tasks.update_question_outbox(create_id, open_for) - - return _user_api_response(activity=create_id) - - -@app.route("/api/stream") -@api_required -def api_stream(): - return Response( - response=json.dumps( - activitypub.build_inbox_json_feed("/api/stream", request.args.get("cursor")) - ), - headers={"Content-Type": "application/json"}, - ) - - -@app.route("/api/block", methods=["POST"]) -@api_required -def api_block(): - actor = _user_api_arg("actor") - - existing = DB.activities.find_one( - { - "box": Box.OUTBOX.value, - "type": ActivityType.BLOCK.value, - "activity.object": actor, - "meta.undo": False, - } - ) - if existing: - return _user_api_response(activity=existing["activity"]["id"]) - - block = ap.Block(actor=MY_PERSON.id, object=actor) - block_id = post_to_outbox(block) - - return _user_api_response(activity=block_id) - - -@app.route("/api/follow", methods=["POST"]) -@api_required -def api_follow(): - actor = _user_api_arg("actor") - - q = { - "box": Box.OUTBOX.value, - "type": ActivityType.FOLLOW.value, - "meta.undo": False, - "activity.object": actor, - } - - existing = DB.activities.find_one(q) - if existing: - return _user_api_response(activity=existing["activity"]["id"]) - - follow = ap.Follow( - actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now() - ) - follow_id = post_to_outbox(follow) - - return _user_api_response(activity=follow_id) - - @app.route("/followers") def followers(): q = {"box": Box.INBOX.value, "type": ActivityType.FOLLOW.value, "meta.undo": False} @@ -2124,6 +964,7 @@ def tags(tag): def featured(): if not is_api_request(): abort(404) + q = { "box": Box.OUTBOX.value, "type": ActivityType.CREATE.value, @@ -2163,228 +1004,6 @@ def liked(): ) -####### -# IndieAuth - - -def build_auth_resp(payload): - if request.headers.get("Accept") == "application/json": - return Response( - status=200, - headers={"Content-Type": "application/json"}, - response=json.dumps(payload), - ) - return Response( - status=200, - headers={"Content-Type": "application/x-www-form-urlencoded"}, - response=urlencode(payload), - ) - - -def _get_prop(props, name, default=None): - if name in props: - items = props.get(name) - if isinstance(items, list): - return items[0] - return items - return default - - -def get_client_id_data(url): - # FIXME(tsileo): ensure not localhost via `little_boxes.urlutils.is_url_valid` - data = mf2py.parse(url=url) - for item in data["items"]: - if "h-x-app" in item["type"] or "h-app" in item["type"]: - props = item.get("properties", {}) - print(props) - return dict( - logo=_get_prop(props, "logo"), - name=_get_prop(props, "name"), - url=_get_prop(props, "url"), - ) - return dict(logo=None, name=url, url=url) - - -@app.route("/indieauth/flow", methods=["POST"]) -@login_required -def indieauth_flow(): - auth = dict( - scope=" ".join(request.form.getlist("scopes")), - me=request.form.get("me"), - client_id=request.form.get("client_id"), - state=request.form.get("state"), - redirect_uri=request.form.get("redirect_uri"), - response_type=request.form.get("response_type"), - ts=datetime.now().timestamp(), - code=binascii.hexlify(os.urandom(8)).decode("utf-8"), - verified=False, - ) - - # XXX(tsileo): a whitelist for me values? - - # TODO(tsileo): redirect_uri checks - if not auth["redirect_uri"]: - abort(400) - - DB.indieauth.insert_one(auth) - - # FIXME(tsileo): fetch client ID and validate redirect_uri - red = f'{auth["redirect_uri"]}?code={auth["code"]}&state={auth["state"]}&me={auth["me"]}' - return redirect(red) - - -@app.route("/indieauth", methods=["GET", "POST"]) -def indieauth_endpoint(): - if request.method == "GET": - if not session.get("logged_in"): - return redirect(url_for("admin_login", next=request.url)) - - me = request.args.get("me") - # FIXME(tsileo): ensure me == ID - client_id = request.args.get("client_id") - redirect_uri = request.args.get("redirect_uri") - state = request.args.get("state", "") - response_type = request.args.get("response_type", "id") - scope = request.args.get("scope", "").split() - - print("STATE", state) - return render_template( - "indieauth_flow.html", - client=get_client_id_data(client_id), - scopes=scope, - redirect_uri=redirect_uri, - state=state, - response_type=response_type, - client_id=client_id, - me=me, - ) - - # Auth verification via POST - code = request.form.get("code") - redirect_uri = request.form.get("redirect_uri") - client_id = request.form.get("client_id") - - ip, geoip = _get_ip() - - auth = DB.indieauth.find_one_and_update( - { - "code": code, - "redirect_uri": redirect_uri, - "client_id": client_id, - "verified": False, - }, - { - "$set": { - "verified": True, - "verified_by": "id", - "verified_at": datetime.now().timestamp(), - "ip_address": ip, - "geoip": geoip, - } - }, - ) - print(auth) - print(code, redirect_uri, client_id) - - # Ensure the code is recent - if (datetime.now() - datetime.fromtimestamp(auth["ts"])) > timedelta(minutes=5): - abort(400) - - if not auth: - abort(403) - return - - session["logged_in"] = True - me = auth["me"] - state = auth["state"] - scope = auth["scope"] - print("STATE", state) - return build_auth_resp({"me": me, "state": state, "scope": scope}) - - -@app.route("/token", methods=["GET", "POST"]) -def token_endpoint(): - # Generate a new token with the returned access code - if request.method == "POST": - code = request.form.get("code") - me = request.form.get("me") - redirect_uri = request.form.get("redirect_uri") - client_id = request.form.get("client_id") - - now = datetime.now() - ip, geoip = _get_ip() - - # This query ensure code, client_id, redirect_uri and me are matching with the code request - auth = DB.indieauth.find_one_and_update( - { - "code": code, - "me": me, - "redirect_uri": redirect_uri, - "client_id": client_id, - "verified": False, - }, - { - "$set": { - "verified": True, - "verified_by": "code", - "verified_at": now.timestamp(), - "ip_address": ip, - "geoip": geoip, - } - }, - ) - - if not auth: - abort(403) - - scope = auth["scope"].split() - - # Ensure there's at least one scope - if not len(scope): - abort(400) - - # Ensure the code is recent - if (now - datetime.fromtimestamp(auth["ts"])) > timedelta(minutes=5): - abort(400) - - payload = dict(me=me, client_id=client_id, scope=scope, ts=now.timestamp()) - token = JWT.dumps(payload).decode("utf-8") - DB.indieauth.update_one( - {"_id": auth["_id"]}, - { - "$set": { - "token": token, - "token_expires": (now + timedelta(minutes=30)).timestamp(), - } - }, - ) - - return build_auth_resp( - {"me": me, "scope": auth["scope"], "access_token": token} - ) - - # Token verification - token = request.headers.get("Authorization").replace("Bearer ", "") - try: - payload = JWT.loads(token) - except BadSignature: - abort(403) - - # Check the token expritation (valid for 3 hours) - if (datetime.now() - datetime.fromtimestamp(payload["ts"])) > timedelta( - minutes=180 - ): - abort(401) - - return build_auth_resp( - { - "me": payload["me"], - "scope": " ".join(payload["scope"]), - "client_id": payload["client_id"], - } - ) - - ################# # Feeds @@ -2413,191 +1032,6 @@ def rss_feed(): ) -@app.route("/task/fetch_og_meta", methods=["POST"]) -def task_fetch_og_meta(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - if activity.has_type(ap.ActivityType.CREATE): - note = activity.get_object() - links = opengraph.links_from_note(note.to_dict()) - og_metadata = opengraph.fetch_og_metadata(USER_AGENT, links) - for og in og_metadata: - if not og.get("image"): - continue - MEDIA_CACHE.cache_og_image(og["image"], iri) - - app.logger.debug(f"OG metadata {og_metadata!r}") - DB.activities.update_one( - {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}} - ) - - app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}") - except (ActivityGoneError, ActivityNotFoundError): - app.logger.exception(f"dropping activity {iri}, skip OG metedata") - return "" - except requests.exceptions.HTTPError as http_err: - if 400 <= http_err.response.status_code < 500: - app.logger.exception("bad request, no retry") - return "" - app.logger.exception("failed to fetch OG metadata") - raise TaskError() from http_err - except Exception as err: - app.logger.exception(f"failed to fetch OG metadata for {iri}") - raise TaskError() from err - - return "" - - -@app.route("/task/cache_object", methods=["POST"]) -def task_cache_object(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - obj = activity.get_object() - DB.activities.update_one( - {"remote_id": activity.id}, - { - "$set": { - "meta.object": obj.to_dict(embed=True), - "meta.object_actor": activitypub._actor_to_meta(obj.get_actor()), - } - }, - ) - except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): - DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) - app.logger.exception(f"flagging activity {iri} as deleted, no object caching") - except Exception as err: - app.logger.exception(f"failed to cache object for {iri}") - raise TaskError() from err - - return "" - - -@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 -def task_finish_post_to_outbox(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - - recipients = activity.recipients() - - if activity.has_type(ap.ActivityType.DELETE): - back.outbox_delete(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.UPDATE): - back.outbox_update(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.CREATE): - back.outbox_create(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.ANNOUNCE): - back.outbox_announce(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.LIKE): - back.outbox_like(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.UNDO): - obj = activity.get_object() - if obj.has_type(ap.ActivityType.LIKE): - back.outbox_undo_like(MY_PERSON, obj) - elif obj.has_type(ap.ActivityType.ANNOUNCE): - back.outbox_undo_announce(MY_PERSON, obj) - elif obj.has_type(ap.ActivityType.FOLLOW): - back.undo_new_following(MY_PERSON, obj) - - app.logger.info(f"recipients={recipients}") - activity = ap.clean_activity(activity.to_dict()) - - DB.cache2.remove() - - payload = json.dumps(activity) - for recp in recipients: - app.logger.debug(f"posting to {recp}") - Tasks.post_to_remote_inbox(payload, recp) - except (ActivityGoneError, ActivityNotFoundError): - app.logger.exception(f"no retry") - except Exception as err: - app.logger.exception(f"failed to post to remote inbox for {iri}") - raise TaskError() from err - - return "" - - -@app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901 -def task_finish_post_to_inbox(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - - if activity.has_type(ap.ActivityType.DELETE): - back.inbox_delete(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.UPDATE): - back.inbox_update(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.CREATE): - back.inbox_create(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.ANNOUNCE): - back.inbox_announce(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.LIKE): - back.inbox_like(MY_PERSON, activity) - elif activity.has_type(ap.ActivityType.FOLLOW): - # Reply to a Follow with an Accept - actor_id = activity.get_actor().id - accept = ap.Accept( - actor=ID, - object={ - "type": "Follow", - "id": activity.id, - "object": activity.get_object_id(), - "actor": actor_id, - }, - to=[actor_id], - published=now(), - ) - post_to_outbox(accept) - elif activity.has_type(ap.ActivityType.UNDO): - obj = activity.get_object() - if obj.has_type(ap.ActivityType.LIKE): - back.inbox_undo_like(MY_PERSON, obj) - elif obj.has_type(ap.ActivityType.ANNOUNCE): - back.inbox_undo_announce(MY_PERSON, obj) - elif obj.has_type(ap.ActivityType.FOLLOW): - back.undo_new_follower(MY_PERSON, obj) - try: - invalidate_cache(activity) - except Exception: - app.logger.exception("failed to invalidate cache") - except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): - app.logger.exception(f"no retry") - except Exception as err: - app.logger.exception(f"failed to cache attachments for {iri}") - raise TaskError() from err - - return "" - - -def post_to_outbox(activity: ap.BaseActivity) -> str: - if activity.has_type(ap.CREATE_TYPES): - activity = activity.build_create() - - # Assign create a random ID - obj_id = back.random_object_id() - - activity.set_id(back.activity_url(obj_id), obj_id) - - back.save(Box.OUTBOX, activity) - Tasks.cache_actor(activity.id) - Tasks.finish_post_to_outbox(activity.id) - return activity.id - - def post_to_inbox(activity: ap.BaseActivity) -> None: # Check for Block activity actor = activity.get_actor() @@ -2617,335 +1051,3 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: app.logger.info(f"spawning task for {activity!r}") Tasks.finish_post_to_inbox(activity.id) - - -def invalidate_cache(activity): - if activity.has_type(ap.ActivityType.LIKE): - if activity.get_object().id.startswith(BASE_URL): - DB.cache2.remove() - elif activity.has_type(ap.ActivityType.ANNOUNCE): - if activity.get_object().id.startswith(BASE_URL): - DB.cache2.remove() - elif activity.has_type(ap.ActivityType.UNDO): - DB.cache2.remove() - elif activity.has_type(ap.ActivityType.DELETE): - # TODO(tsileo): only invalidate if it's a delete of a reply - DB.cache2.remove() - elif activity.has_type(ap.ActivityType.UPDATE): - DB.cache2.remove() - elif activity.has_type(ap.ActivityType.CREATE): - note = activity.get_object() - in_reply_to = note.get_in_reply_to() - if not in_reply_to or in_reply_to.startswith(ID): - DB.cache2.remove() - # FIXME(tsileo): check if it's a reply of a reply - - -@app.route("/task/cache_attachments", methods=["POST"]) -def task_cache_attachments(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - # Generates thumbnails for the actor's icon and the attachments if any - - obj = activity.get_object() - - # Iter the attachments - for attachment in obj._data.get("attachment", []): - try: - MEDIA_CACHE.cache_attachment(attachment, iri) - except ValueError: - app.logger.exception(f"failed to cache {attachment}") - - app.logger.info(f"attachments cached for {iri}") - - except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): - app.logger.exception(f"dropping activity {iri}, no attachment caching") - except Exception as err: - app.logger.exception(f"failed to cache attachments for {iri}") - raise TaskError() from err - - return "" - - -@app.route("/task/cache_actor", methods=["POST"]) -def task_cache_actor() -> str: - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload["iri"] - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - - # Fetch the Open Grah metadata if it's a `Create` - if activity.has_type(ap.ActivityType.CREATE): - Tasks.fetch_og_meta(iri) - - actor = activity.get_actor() - if actor.icon: - if isinstance(actor.icon, dict) and "url" in actor.icon: - MEDIA_CACHE.cache_actor_icon(actor.icon["url"]) - else: - app.logger.warning(f"failed to parse icon {actor.icon} for {iri}") - - if activity.has_type(ap.ActivityType.FOLLOW): - if actor.id == ID: - # It's a new following, cache the "object" (which is the actor we follow) - DB.activities.update_one( - {"remote_id": iri}, - { - "$set": { - "meta.object": activity.get_object().to_dict(embed=True) - } - }, - ) - - # Cache the actor info - DB.activities.update_one( - {"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}} - ) - - app.logger.info(f"actor cached for {iri}") - if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): - Tasks.cache_attachments(iri) - - except (ActivityGoneError, ActivityNotFoundError): - DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) - app.logger.exception(f"flagging activity {iri} as deleted, no actor caching") - except Exception as err: - app.logger.exception(f"failed to cache actor for {iri}") - raise TaskError() from err - - return "" - - -@app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 -def task_process_new_activity(): - """Process an activity received in the inbox""" - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - app.logger.info(f"activity={activity!r}") - - flags = {} - - if not activity.published: - flags[_meta(MetaKey.PUBLISHED)] = now() - else: - flags[_meta(MetaKey.PUBLISHED)] = activity.published - - set_inbox_flags(activity, flags) - app.logger.info(f"a={activity}, flags={flags!r}") - if flags: - DB.activities.update_one({"remote_id": activity.id}, {"$set": flags}) - - app.logger.info(f"new activity {iri} processed") - if not activity.has_type(ap.ActivityType.DELETE): - Tasks.cache_actor(iri) - except (ActivityGoneError, ActivityNotFoundError): - app.logger.exception(f"dropping activity {iri}, skip processing") - return "" - except Exception as err: - app.logger.exception(f"failed to process new activity {iri}") - raise TaskError() from err - - return "" - - -@app.route("/task/forward_activity", methods=["POST"]) -def task_forward_activity(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - activity = ap.fetch_remote_activity(iri) - recipients = back.followers_as_recipients() - app.logger.debug(f"Forwarding {activity!r} to {recipients}") - activity = ap.clean_activity(activity.to_dict()) - payload = json.dumps(activity) - for recp in recipients: - app.logger.debug(f"forwarding {activity!r} to {recp}") - Tasks.post_to_remote_inbox(payload, recp) - except Exception as err: - app.logger.exception("task failed") - raise TaskError() from err - - return "" - - -@app.route("/task/post_to_remote_inbox", methods=["POST"]) -def task_post_to_remote_inbox(): - """Post an activity to a remote inbox.""" - task = p.parse(request) - app.logger.info(f"task={task!r}") - payload, to = task.payload["payload"], task.payload["to"] - try: - app.logger.info("payload=%s", payload) - app.logger.info("generating sig") - signed_payload = json.loads(payload) - - # XXX Disable JSON-LD signature crap for now (as HTTP signatures are enough for most implementations) - # Don't overwrite the signature if we're forwarding an activity - # if "signature" not in signed_payload: - # generate_signature(signed_payload, KEY) - - app.logger.info("to=%s", to) - resp = requests.post( - to, - data=json.dumps(signed_payload), - auth=SIG_AUTH, - headers={ - "Content-Type": HEADERS[1], - "Accept": HEADERS[1], - "User-Agent": USER_AGENT, - }, - ) - app.logger.info("resp=%s", resp) - app.logger.info("resp_body=%s", resp.text) - resp.raise_for_status() - except HTTPError as err: - app.logger.exception("request failed") - if 400 >= err.response.status_code >= 499: - app.logger.info("client error, no retry") - return "" - - raise TaskError() from err - except Exception as err: - app.logger.exception("task failed") - raise TaskError() from err - - return "" - - -@app.route("/task/fetch_remote_question", methods=["POST"]) -def task_fetch_remote_question(): - """Fetch a remote question for implementation that does not send Update.""" - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - app.logger.info(f"Fetching remote question {iri}") - local_question = DB.activities.find_one( - { - "box": Box.INBOX.value, - "type": ActivityType.CREATE.value, - "activity.object.id": iri, - } - ) - remote_question = get_backend().fetch_iri(iri, no_cache=True) - # FIXME(tsileo): compute and set `meta.object_visiblity` (also update utils.py to do it) - if ( - local_question - and ( - local_question["meta"].get("voted_for") - or local_question["meta"].get("subscribed") - ) - and not DB.notifications.find_one({"activity.id": remote_question["id"]}) - ): - DB.notifications.insert_one( - { - "type": "question_ended", - "datetime": datetime.now(timezone.utc).isoformat(), - "activity": remote_question, - } - ) - - # Update the Create if we received it in the inbox - if local_question: - DB.activities.update_one( - {"remote_id": local_question["remote_id"], "box": Box.INBOX.value}, - {"$set": {"activity.object": remote_question}}, - ) - - # Also update all the cached copies (Like, Announce...) - DB.activities.update_many( - {"meta.object.id": remote_question["id"]}, - {"$set": {"meta.object": remote_question}}, - ) - - except HTTPError as err: - app.logger.exception("request failed") - if 400 >= err.response.status_code >= 499: - app.logger.info("client error, no retry") - return "" - - raise TaskError() from err - except Exception as err: - app.logger.exception("task failed") - raise TaskError() from err - - return "" - - -@app.route("/task/update_question", methods=["POST"]) -def task_update_question(): - """Sends an Update.""" - task = p.parse(request) - app.logger.info(f"task={task!r}") - iri = task.payload - try: - app.logger.info(f"Updating question {iri}") - cc = [ID + "/followers"] - doc = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri}) - _add_answers_to_question(doc) - question = ap.Question(**doc["activity"]["object"]) - - raw_update = dict( - actor=question.id, - object=question.to_dict(embed=True), - attributedTo=MY_PERSON.id, - cc=list(set(cc)), - to=[ap.AS_PUBLIC], - ) - raw_update["@context"] = config.DEFAULT_CTX - - update = ap.Update(**raw_update) - print(update) - print(update.to_dict()) - post_to_outbox(update) - - except HTTPError as err: - app.logger.exception("request failed") - if 400 >= err.response.status_code >= 499: - app.logger.info("client error, no retry") - return "" - - raise TaskError() from err - except Exception as err: - app.logger.exception("task failed") - raise TaskError() from err - - return "" - - -@app.route("/task/cleanup", methods=["POST"]) -def task_cleanup(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - activity_gc.perform() - return "" - - -def task_cleanup_part_1(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - return "OK" - - -@app.route("/task/cleanup_part_2", methods=["POST"]) -def task_cleanup_part_2(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - return "OK" - - -@app.route("/task/cleanup_part_3", methods=["POST"]) -def task_cleanup_part_3(): - task = p.parse(request) - app.logger.info(f"task={task!r}") - return "OK" diff --git a/app_utils.py b/app_utils.py index 3a7068f..cecc724 100644 --- a/app_utils.py +++ b/app_utils.py @@ -1,11 +1,39 @@ +import os +from datetime import datetime +from datetime import timezone +from functools import wraps +from typing import Any +from typing import Dict +from typing import Union + +import flask +import werkzeug +from bson.objectid import ObjectId +from flask import current_app as app +from flask import redirect +from flask import request +from flask import session +from flask import url_for from flask_wtf.csrf import CSRFProtect from little_boxes import activitypub as ap +from little_boxes.activitypub import format_datetime +from poussetaches import PousseTaches import activitypub from activitypub import Box +from activitypub import _answer_key +from config import DB from config import ME from tasks import Tasks +_Response = Union[flask.Response, werkzeug.wrappers.Response, str] + +p = PousseTaches( + os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"), + os.getenv("MICROBLOGPUB_INTERNAL_HOST", "http://localhost:5000"), +) + + csrf = CSRFProtect() @@ -15,6 +43,55 @@ ap.use_backend(back) MY_PERSON = ap.Person(**ME) +def add_response_headers(headers={}): + """This decorator adds the headers passed in to the response""" + + def decorator(f): + @wraps(f) + def decorated_function(*args, **kwargs): + resp = flask.make_response(f(*args, **kwargs)) + h = resp.headers + for header, value in headers.items(): + h[header] = value + return resp + + return decorated_function + + return decorator + + +def noindex(f): + """This decorator passes X-Robots-Tag: noindex, nofollow""" + return add_response_headers({"X-Robots-Tag": "noindex, nofollow"})(f) + + +def login_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not session.get("logged_in"): + return redirect(url_for("admin_login", next=request.url)) + return f(*args, **kwargs) + + return decorated_function + + +def _get_ip(): + """Guess the IP address from the request. Only used for security purpose (failed logins or bad payload). + + Geoip will be returned if the "broxy" headers are set (it does Geoip + using an offline database and append these special headers). + """ + ip = request.headers.get("X-Forwarded-For", request.remote_addr) + geoip = None + if request.headers.get("Broxy-Geoip-Country"): + geoip = ( + request.headers.get("Broxy-Geoip-Country") + + "/" + + request.headers.get("Broxy-Geoip-Region") + ) + return ip, geoip + + def post_to_outbox(activity: ap.BaseActivity) -> str: if activity.has_type(ap.CREATE_TYPES): activity = activity.build_create() @@ -28,3 +105,128 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: Tasks.cache_actor(activity.id) Tasks.finish_post_to_outbox(activity.id) return activity.id + + +def _build_thread(data, include_children=True): # noqa: C901 + data["_requested"] = True + app.logger.info(f"_build_thread({data!r})") + root_id = data["meta"].get("thread_root_parent", data["activity"]["object"]["id"]) + + query = { + "$or": [{"meta.thread_root_parent": root_id}, {"activity.object.id": root_id}], + "meta.deleted": False, + } + replies = [data] + for dat in DB.activities.find(query): + print(dat["type"]) + if dat["type"][0] == ap.ActivityType.CREATE.value: + replies.append(dat) + if dat["type"][0] == ap.ActivityType.UPDATE.value: + continue + else: + # Make a Note/Question/... looks like a Create + dat = { + "activity": {"object": dat["activity"]}, + "meta": dat["meta"], + "_id": dat["_id"], + } + replies.append(dat) + + replies = sorted(replies, key=lambda d: d["activity"]["object"]["published"]) + + # Index all the IDs in order to build a tree + idx = {} + replies2 = [] + for rep in replies: + rep_id = rep["activity"]["object"]["id"] + if rep_id in idx: + continue + idx[rep_id] = rep.copy() + idx[rep_id]["_nodes"] = [] + replies2.append(rep) + + # Build the tree + for rep in replies2: + rep_id = rep["activity"]["object"]["id"] + if rep_id == root_id: + continue + reply_of = ap._get_id(rep["activity"]["object"].get("inReplyTo")) + try: + idx[reply_of]["_nodes"].append(rep) + except KeyError: + app.logger.info(f"{reply_of} is not there! skipping {rep}") + + # Flatten the tree + thread = [] + + def _flatten(node, level=0): + node["_level"] = level + thread.append(node) + + for snode in sorted( + idx[node["activity"]["object"]["id"]]["_nodes"], + key=lambda d: d["activity"]["object"]["published"], + ): + _flatten(snode, level=level + 1) + + try: + _flatten(idx[root_id]) + except KeyError: + app.logger.info(f"{root_id} is not there! skipping") + + return thread + + +def paginated_query(db, q, limit=25, sort_key="_id"): + older_than = newer_than = None + query_sort = -1 + first_page = not request.args.get("older_than") and not request.args.get( + "newer_than" + ) + + query_older_than = request.args.get("older_than") + query_newer_than = request.args.get("newer_than") + + if query_older_than: + q["_id"] = {"$lt": ObjectId(query_older_than)} + elif query_newer_than: + q["_id"] = {"$gt": ObjectId(query_newer_than)} + query_sort = 1 + + outbox_data = list(db.find(q, limit=limit + 1).sort(sort_key, query_sort)) + outbox_len = len(outbox_data) + outbox_data = sorted( + outbox_data[:limit], key=lambda x: str(x[sort_key]), reverse=True + ) + + if query_older_than: + newer_than = str(outbox_data[0]["_id"]) + if outbox_len == limit + 1: + older_than = str(outbox_data[-1]["_id"]) + elif query_newer_than: + older_than = str(outbox_data[-1]["_id"]) + if outbox_len == limit + 1: + newer_than = str(outbox_data[0]["_id"]) + elif first_page and outbox_len == limit + 1: + older_than = str(outbox_data[-1]["_id"]) + + return outbox_data, older_than, newer_than + + +def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None: + activity = raw_doc["activity"] + if ( + ap._has_type(activity["type"], ap.ActivityType.CREATE) + and "object" in activity + and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION) + ): + for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")): + choice["replies"] = { + "type": ap.ActivityType.COLLECTION.value, + "totalItems": raw_doc["meta"] + .get("question_answers", {}) + .get(_answer_key(choice["name"]), 0), + } + now = datetime.now(timezone.utc) + if format_datetime(now) >= activity["object"]["endTime"]: + activity["object"]["closed"] = activity["object"]["endTime"] diff --git a/blueprints/__init__.py b/blueprints/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/blueprints/admin.py b/blueprints/admin.py new file mode 100644 index 0000000..2ecc948 --- /dev/null +++ b/blueprints/admin.py @@ -0,0 +1,414 @@ +import json +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from typing import Any +from typing import List +from urllib.parse import urlparse + +import flask +from flask import abort +from flask import current_app as app +from flask import redirect +from flask import render_template +from flask import request +from flask import session +from flask import url_for +from little_boxes import activitypub as ap +from passlib.hash import bcrypt +from u2flib_server import u2f + +import config +from activitypub import Box +from app_utils import MY_PERSON +from app_utils import _build_thread +from app_utils import _Response +from app_utils import csrf +from app_utils import login_required +from app_utils import noindex +from app_utils import p +from app_utils import paginated_query +from app_utils import post_to_outbox +from config import DB +from config import ID +from config import PASS +from utils import now +from utils.lookup import lookup + +blueprint = flask.Blueprint("admin", __name__) + + +def verify_pass(pwd): + return bcrypt.verify(pwd, PASS) + + +@blueprint.route("/admin/update_actor") +@login_required +def admin_update_actor() -> _Response: + update = ap.Update( + actor=MY_PERSON.id, + object=MY_PERSON.to_dict(), + to=[MY_PERSON.followers], + cc=[ap.AS_PUBLIC], + published=now(), + ) + + post_to_outbox(update) + return "OK" + + +@blueprint.route("/admin/logout") +@login_required +def admin_logout() -> _Response: + session["logged_in"] = False + return redirect("/") + + +@blueprint.route("/login", methods=["POST", "GET"]) +@noindex +def admin_login() -> _Response: + if session.get("logged_in") is True: + return redirect(url_for("admin_notifications")) + + devices = [doc["device"] for doc in DB.u2f.find()] + u2f_enabled = True if devices else False + if request.method == "POST": + csrf.protect() + # 1. Check regular password login flow + pwd = request.form.get("pass") + if pwd: + if verify_pass(pwd): + session["logged_in"] = True + return redirect( + request.args.get("redirect") or url_for("admin_notifications") + ) + else: + abort(403) + # 2. Check for U2F payload, if any + elif devices: + resp = json.loads(request.form.get("resp")) # type: ignore + try: + u2f.complete_authentication(session["challenge"], resp) + except ValueError as exc: + print("failed", exc) + abort(403) + return + finally: + session["challenge"] = None + + session["logged_in"] = True + return redirect( + request.args.get("redirect") or url_for("admin_notifications") + ) + else: + abort(401) + + payload = None + if devices: + payload = u2f.begin_authentication(ID, devices) + session["challenge"] = payload + + return render_template("login.html", u2f_enabled=u2f_enabled, payload=payload) + + +@blueprint.route("/admin", methods=["GET"]) +@login_required +def admin_index() -> _Response: + q = { + "meta.deleted": False, + "meta.undo": False, + "type": ap.ActivityType.LIKE.value, + "box": Box.OUTBOX.value, + } + col_liked = DB.activities.count(q) + + return render_template( + "admin.html", + instances=list(DB.instances.find()), + inbox_size=DB.activities.count({"box": Box.INBOX.value}), + outbox_size=DB.activities.count({"box": Box.OUTBOX.value}), + col_liked=col_liked, + col_followers=DB.activities.count( + { + "box": Box.INBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + ), + col_following=DB.activities.count( + { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + ), + ) + + +@blueprint.route("/admin/indieauth", methods=["GET"]) +@login_required +def admin_indieauth() -> _Response: + return render_template( + "admin_indieauth.html", + indieauth_actions=DB.indieauth.find().sort("ts", -1).limit(100), + ) + + +@blueprint.route("/admin/tasks", methods=["GET"]) +@login_required +def admin_tasks() -> _Response: + return render_template( + "admin_tasks.html", + success=p.get_success(), + dead=p.get_dead(), + waiting=p.get_waiting(), + cron=p.get_cron(), + ) + + +@blueprint.route("/admin/lookup", methods=["GET", "POST"]) +@login_required +def admin_lookup() -> _Response: + data = None + meta = None + if request.method == "POST": + if request.form.get("url"): + data = lookup(request.form.get("url")) # type: ignore + if data: + if data.has_type(ap.ActivityType.ANNOUNCE): + meta = dict( + object=data.get_object().to_dict(), + object_actor=data.get_object().get_actor().to_dict(), + actor=data.get_actor().to_dict(), + ) + + elif data.has_type(ap.ActivityType.QUESTION): + p.push(data.id, "/task/fetch_remote_question") + + print(data) + app.logger.debug(data.to_dict()) + return render_template( + "lookup.html", data=data, meta=meta, url=request.form.get("url") + ) + + +@blueprint.route("/admin/thread") +@login_required +def admin_thread() -> _Response: + data = DB.activities.find_one( + { + "type": ap.ActivityType.CREATE.value, + "activity.object.id": request.args.get("oid"), + } + ) + + if not data: + abort(404) + if data["meta"].get("deleted", False): + abort(410) + thread = _build_thread(data) + + tpl = "note.html" + if request.args.get("debug"): + tpl = "note_debug.html" + return render_template(tpl, thread=thread, note=data) + + +@blueprint.route("/admin/new", methods=["GET"]) +@login_required +def admin_new() -> _Response: + reply_id = None + content = "" + thread: List[Any] = [] + print(request.args) + if request.args.get("reply"): + data = DB.activities.find_one({"activity.object.id": request.args.get("reply")}) + if data: + reply = ap.parse_activity(data["activity"]) + else: + data = dict( + meta={}, + activity=dict( + object=ap.get_backend().fetch_iri(request.args.get("reply")) + ), + ) + reply = ap.parse_activity(data["activity"]["object"]) + + reply_id = reply.id + if reply.ACTIVITY_TYPE == ap.ActivityType.CREATE: + reply_id = reply.get_object().id + actor = reply.get_actor() + domain = urlparse(actor.id).netloc + # FIXME(tsileo): if reply of reply, fetch all participants + content = f"@{actor.preferredUsername}@{domain} " + thread = _build_thread(data) + + return render_template( + "new.html", + reply=reply_id, + content=content, + thread=thread, + visibility=ap.Visibility, + emojis=config.EMOJIS.split(" "), + ) + + +@blueprint.route("/admin/lists", methods=["GET"]) +@login_required +def admin_lists() -> _Response: + lists = list(DB.lists.find()) + + return render_template("lists.html", lists=lists) + + +@blueprint.route("/admin/notifications") +@login_required +def admin_notifications() -> _Response: + # Setup the cron for deleting old activities + + # FIXME(tsileo): put back to 12h + p.push({}, "/task/cleanup", schedule="@every 1h") + + # Trigger a cleanup if asked + if request.args.get("cleanup"): + p.push({}, "/task/cleanup") + + # FIXME(tsileo): show unfollow (performed by the current actor) and liked??? + mentions_query = { + "type": ap.ActivityType.CREATE.value, + "activity.object.tag.type": "Mention", + "activity.object.tag.name": f"@{config.USERNAME}@{config.DOMAIN}", + "meta.deleted": False, + } + replies_query = { + "type": ap.ActivityType.CREATE.value, + "activity.object.inReplyTo": {"$regex": f"^{config.BASE_URL}"}, + "meta.poll_answer": False, + } + announced_query = { + "type": ap.ActivityType.ANNOUNCE.value, + "activity.object": {"$regex": f"^{config.BASE_URL}"}, + } + new_followers_query = {"type": ap.ActivityType.FOLLOW.value} + unfollow_query = { + "type": ap.ActivityType.UNDO.value, + "activity.object.type": ap.ActivityType.FOLLOW.value, + } + likes_query = { + "type": ap.ActivityType.LIKE.value, + "activity.object": {"$regex": f"^{config.BASE_URL}"}, + } + followed_query = {"type": ap.ActivityType.ACCEPT.value} + q = { + "box": Box.INBOX.value, + "$or": [ + mentions_query, + announced_query, + replies_query, + new_followers_query, + followed_query, + unfollow_query, + likes_query, + ], + } + inbox_data, older_than, newer_than = paginated_query(DB.activities, q) + if not newer_than: + nstart = datetime.now(timezone.utc).isoformat() + else: + nstart = inbox_data[0]["_id"].generation_time.isoformat() + if not older_than: + nend = (datetime.now(timezone.utc) - timedelta(days=15)).isoformat() + else: + nend = inbox_data[-1]["_id"].generation_time.isoformat() + print(nstart, nend) + notifs = list( + DB.notifications.find({"datetime": {"$lte": nstart, "$gt": nend}}) + .sort("_id", -1) + .limit(50) + ) + print(inbox_data) + + nid = None + if inbox_data: + nid = inbox_data[0]["_id"] + + inbox_data.extend(notifs) + inbox_data = sorted( + inbox_data, reverse=True, key=lambda doc: doc["_id"].generation_time + ) + + return render_template( + "stream.html", + inbox_data=inbox_data, + older_than=older_than, + newer_than=newer_than, + nid=nid, + ) + + +@blueprint.route("/admin/stream") +@login_required +def admin_stream() -> _Response: + q = {"meta.stream": True, "meta.deleted": False} + + tpl = "stream.html" + if request.args.get("debug"): + tpl = "stream_debug.html" + if request.args.get("debug_inbox"): + q = {} + + inbox_data, older_than, newer_than = paginated_query( + DB.activities, q, limit=int(request.args.get("limit", 25)) + ) + + return render_template( + tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than + ) + + +@blueprint.route("/admin/list/") +@login_required +def admin_list(name: str) -> _Response: + list_ = DB.lists.find_one({"name": name}) + if not list_: + abort(404) + + q = { + "meta.stream": True, + "meta.deleted": False, + "meta.actor_id": {"$in": list_["members"]}, + } + + tpl = "stream.html" + if request.args.get("debug"): + tpl = "stream_debug.html" + if request.args.get("debug_inbox"): + q = {} + + inbox_data, older_than, newer_than = paginated_query( + DB.activities, q, limit=int(request.args.get("limit", 25)) + ) + + return render_template( + tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than + ) + + +@blueprint.route("/admin/bookmarks") +@login_required +def admin_bookmarks() -> _Response: + q = {"meta.bookmarked": True} + + tpl = "stream.html" + if request.args.get("debug"): + tpl = "stream_debug.html" + if request.args.get("debug_inbox"): + q = {} + + inbox_data, older_than, newer_than = paginated_query( + DB.activities, q, limit=int(request.args.get("limit", 25)) + ) + + return render_template( + tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than + ) diff --git a/blueprints/api.py b/blueprints/api.py new file mode 100644 index 0000000..52aec64 --- /dev/null +++ b/blueprints/api.py @@ -0,0 +1,585 @@ +import json +import mimetypes +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from functools import wraps +from io import BytesIO +from typing import Any +from typing import List + +import flask +from bson.objectid import ObjectId +from flask import Response +from flask import abort +from flask import current_app as app +from flask import redirect +from flask import request +from flask import session +from itsdangerous import BadSignature +from little_boxes import activitypub as ap +from little_boxes.content_helper import parse_markdown +from little_boxes.errors import ActivityNotFoundError +from little_boxes.errors import NotFromOutboxError +from werkzeug.utils import secure_filename + +import activitypub +import config +from activitypub import Box +from app_utils import MY_PERSON +from app_utils import _Response +from app_utils import back +from app_utils import csrf +from app_utils import post_to_outbox +from config import BASE_URL +from config import DB +from config import DEBUG_MODE +from config import ID +from config import JWT +from config import MEDIA_CACHE +from config import _drop_db +from tasks import Tasks +from utils import now +from utils.meta import MetaKey +from utils.meta import _meta + +blueprint = flask.Blueprint("api", __name__) + + +def without_id(l): + out = [] + for d in l: + if "_id" in d: + del d["_id"] + out.append(d) + return out + + +def _api_required() -> None: + if session.get("logged_in"): + if request.method not in ["GET", "HEAD"]: + # If a standard API request is made with a "login session", it must havw a CSRF token + csrf.protect() + return + + # Token verification + token = request.headers.get("Authorization", "").replace("Bearer ", "") + if not token: + # IndieAuth token + token = request.form.get("access_token", "") + + # Will raise a BadSignature on bad auth + payload = JWT.loads(token) + app.logger.info(f"api call by {payload}") + + +def api_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + try: + _api_required() + except BadSignature: + abort(401) + + return f(*args, **kwargs) + + return decorated_function + + +def _user_api_arg(key: str, **kwargs) -> Any: + """Try to get the given key from the requests, try JSON body, form data and query arg.""" + if request.is_json: + oid = request.json.get(key) + else: + oid = request.args.get(key) or request.form.get(key) + + if not oid: + if "default" in kwargs: + app.logger.info(f'{key}={kwargs.get("default")}') + return kwargs.get("default") + + raise ValueError(f"missing {key}") + + app.logger.info(f"{key}={oid}") + return oid + + +def _user_api_get_note(from_outbox: bool = False) -> ap.BaseActivity: + oid = _user_api_arg("id") + app.logger.info(f"fetching {oid}") + note = ap.parse_activity(ap.get_backend().fetch_iri(oid)) + if from_outbox and not note.id.startswith(ID): + raise NotFromOutboxError( + f"cannot load {note.id}, id must be owned by the server" + ) + + return note + + +def _user_api_response(**kwargs) -> _Response: + _redirect = _user_api_arg("redirect", default=None) + if _redirect: + return redirect(_redirect) + + resp = flask.jsonify(**kwargs) + resp.status_code = 201 + return resp + + +@blueprint.route("/note/delete", methods=["POST"]) +@api_required +def api_delete() -> _Response: + """API endpoint to delete a Note activity.""" + note = _user_api_get_note(from_outbox=True) + + # Create the delete, same audience as the Create object + delete = ap.Delete( + actor=ID, + object=ap.Tombstone(id=note.id).to_dict(embed=True), + to=note.to, + cc=note.cc, + published=now(), + ) + + delete_id = post_to_outbox(delete) + + return _user_api_response(activity=delete_id) + + +@blueprint.route("/boost", methods=["POST"]) +@api_required +def api_boost() -> _Response: + note = _user_api_get_note() + + # Ensures the note visibility allow us to build an Announce (in respect to the post visibility) + if ap.get_visibility(note) not in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: + abort(400) + + announce = ap.Announce( + actor=MY_PERSON.id, + object=note.id, + to=[MY_PERSON.followers, note.attributedTo], + cc=[ap.AS_PUBLIC], + published=now(), + ) + announce_id = post_to_outbox(announce) + + return _user_api_response(activity=announce_id) + + +@blueprint.route("/mark_notifications_as_read", methods=["POST"]) +@api_required +def api_mark_notification_as_read() -> _Response: + nid = ObjectId(_user_api_arg("nid")) + + DB.activities.update_many( + {_meta(MetaKey.NOTIFICATION_UNREAD): True, "_id": {"$lte": nid}}, + {"$set": {_meta(MetaKey.NOTIFICATION_UNREAD): False}}, + ) + + return _user_api_response() + + +@blueprint.route("/vote", methods=["POST"]) +@api_required +def api_vote() -> _Response: + oid = _user_api_arg("id") + app.logger.info(f"fetching {oid}") + note = ap.parse_activity(ap.get_backend().fetch_iri(oid)) + choice = _user_api_arg("choice") + + raw_note = dict( + attributedTo=MY_PERSON.id, + cc=[], + to=note.get_actor().id, + name=choice, + tag=[], + inReplyTo=note.id, + ) + raw_note["@context"] = config.DEFAULT_CTX + + note = ap.Note(**raw_note) + create = note.build_create() + create_id = post_to_outbox(create) + + return _user_api_response(activity=create_id) + + +@blueprint.route("/like", methods=["POST"]) +@api_required +def api_like() -> _Response: + note = _user_api_get_note() + + to: List[str] = [] + cc: List[str] = [] + + note_visibility = ap.get_visibility(note) + + if note_visibility == ap.Visibility.PUBLIC: + to = [ap.AS_PUBLIC] + cc = [ID + "/followers", note.get_actor().id] + elif note_visibility == ap.Visibility.UNLISTED: + to = [ID + "/followers", note.get_actor().id] + cc = [ap.AS_PUBLIC] + else: + to = [note.get_actor().id] + + like = ap.Like(object=note.id, actor=MY_PERSON.id, to=to, cc=cc, published=now()) + + like_id = post_to_outbox(like) + + return _user_api_response(activity=like_id) + + +@blueprint.route("/bookmark", methods=["POST"]) +@api_required +def api_bookmark() -> _Response: + note = _user_api_get_note() + + undo = _user_api_arg("undo", default=None) == "yes" + + # Try to bookmark the `Create` first + if not DB.activities.update_one( + {"activity.object.id": note.id}, {"$set": {"meta.bookmarked": not undo}} + ).modified_count: + # Then look for the `Announce` + DB.activities.update_one( + {"meta.object.id": note.id}, {"$set": {"meta.bookmarked": not undo}} + ) + + return _user_api_response() + + +@blueprint.route("/note/pin", methods=["POST"]) +@api_required +def api_pin() -> _Response: + note = _user_api_get_note(from_outbox=True) + + DB.activities.update_one( + {"activity.object.id": note.id, "box": Box.OUTBOX.value}, + {"$set": {"meta.pinned": True}}, + ) + + return _user_api_response(pinned=True) + + +@blueprint.route("/note/unpin", methods=["POST"]) +@api_required +def api_unpin() -> _Response: + note = _user_api_get_note(from_outbox=True) + + DB.activities.update_one( + {"activity.object.id": note.id, "box": Box.OUTBOX.value}, + {"$set": {"meta.pinned": False}}, + ) + + return _user_api_response(pinned=False) + + +@blueprint.route("/undo", methods=["POST"]) +@api_required +def api_undo() -> _Response: + oid = _user_api_arg("id") + doc = DB.activities.find_one( + { + "box": Box.OUTBOX.value, + "$or": [{"remote_id": back.activity_url(oid)}, {"remote_id": oid}], + } + ) + if not doc: + raise ActivityNotFoundError(f"cannot found {oid}") + + obj = ap.parse_activity(doc.get("activity")) + + undo = ap.Undo( + actor=MY_PERSON.id, + object=obj.to_dict(embed=True, embed_object_id_only=True), + published=now(), + to=obj.to, + cc=obj.cc, + ) + + # FIXME(tsileo): detect already undo-ed and make this API call idempotent + undo_id = post_to_outbox(undo) + + return _user_api_response(activity=undo_id) + + +@blueprint.route("/new_list", methods=["POST"]) +@api_required +def api_new_list() -> _Response: + name = _user_api_arg("name") + if not name: + raise ValueError("missing name") + + if not DB.lists.find_one({"name": name}): + DB.lists.insert_one({"name": name, "members": []}) + + return _user_api_response(name=name) + + +@blueprint.route("/delete_list", methods=["POST"]) +@api_required +def api_delete_list() -> _Response: + name = _user_api_arg("name") + if not name: + raise ValueError("missing name") + + if not DB.lists.find_one({"name": name}): + abort(404) + + DB.lists.delete_one({"name": name}) + + return _user_api_response() + + +@blueprint.route("/add_to_list", methods=["POST"]) +@api_required +def api_add_to_list() -> _Response: + list_name = _user_api_arg("list_name") + if not list_name: + raise ValueError("missing list_name") + + if not DB.lists.find_one({"name": list_name}): + raise ValueError(f"list {list_name} does not exist") + + actor_id = _user_api_arg("actor_id") + if not actor_id: + raise ValueError("missing actor_id") + + DB.lists.update_one({"name": list_name}, {"$addToSet": {"members": actor_id}}) + + return _user_api_response() + + +@blueprint.route("/remove_from_list", methods=["POST"]) +@api_required +def api_remove_from_list() -> _Response: + list_name = _user_api_arg("list_name") + if not list_name: + raise ValueError("missing list_name") + + if not DB.lists.find_one({"name": list_name}): + raise ValueError(f"list {list_name} does not exist") + + actor_id = _user_api_arg("actor_id") + if not actor_id: + raise ValueError("missing actor_id") + + DB.lists.update_one({"name": list_name}, {"$pull": {"members": actor_id}}) + + return _user_api_response() + + +@blueprint.route("/new_note", methods=["POST"]) +@api_required +def api_new_note() -> _Response: + source = _user_api_arg("content") + if not source: + raise ValueError("missing content") + + _reply, reply = None, None + try: + _reply = _user_api_arg("reply") + except ValueError: + pass + + visibility = ap.Visibility[ + _user_api_arg("visibility", default=ap.Visibility.PUBLIC.name) + ] + + content, tags = parse_markdown(source) + + to: List[str] = [] + cc: List[str] = [] + + if visibility == ap.Visibility.PUBLIC: + to = [ap.AS_PUBLIC] + cc = [ID + "/followers"] + elif visibility == ap.Visibility.UNLISTED: + to = [ID + "/followers"] + cc = [ap.AS_PUBLIC] + elif visibility == ap.Visibility.FOLLOWERS_ONLY: + to = [ID + "/followers"] + cc = [] + + if _reply: + reply = ap.fetch_remote_activity(_reply) + if visibility == ap.Visibility.DIRECT: + to.append(reply.attributedTo) + else: + cc.append(reply.attributedTo) + + for tag in tags: + if tag["type"] == "Mention": + if visibility == ap.Visibility.DIRECT: + to.append(tag["href"]) + else: + cc.append(tag["href"]) + + raw_note = dict( + attributedTo=MY_PERSON.id, + cc=list(set(cc)), + to=list(set(to)), + content=content, + tag=tags, + source={"mediaType": "text/markdown", "content": source}, + inReplyTo=reply.id if reply else None, + ) + + if "file" in request.files and request.files["file"].filename: + file = request.files["file"] + rfilename = secure_filename(file.filename) + with BytesIO() as buf: + file.save(buf) + oid = MEDIA_CACHE.save_upload(buf, rfilename) + mtype = mimetypes.guess_type(rfilename)[0] + + raw_note["attachment"] = [ + { + "mediaType": mtype, + "name": rfilename, + "type": "Document", + "url": f"{BASE_URL}/uploads/{oid}/{rfilename}", + } + ] + + note = ap.Note(**raw_note) + create = note.build_create() + create_id = post_to_outbox(create) + + return _user_api_response(activity=create_id) + + +@blueprint.route("/new_question", methods=["POST"]) +@api_required +def api_new_question() -> _Response: + source = _user_api_arg("content") + if not source: + raise ValueError("missing content") + + content, tags = parse_markdown(source) + cc = [ID + "/followers"] + + for tag in tags: + if tag["type"] == "Mention": + cc.append(tag["href"]) + + answers = [] + for i in range(4): + a = _user_api_arg(f"answer{i}", default=None) + if not a: + break + answers.append( + { + "type": ap.ActivityType.NOTE.value, + "name": a, + "replies": {"type": ap.ActivityType.COLLECTION.value, "totalItems": 0}, + } + ) + + open_for = int(_user_api_arg("open_for")) + choices = { + "endTime": ap.format_datetime( + datetime.now(timezone.utc) + timedelta(minutes=open_for) + ) + } + of = _user_api_arg("of") + if of == "anyOf": + choices["anyOf"] = answers + else: + choices["oneOf"] = answers + + raw_question = dict( + attributedTo=MY_PERSON.id, + cc=list(set(cc)), + to=[ap.AS_PUBLIC], + content=content, + tag=tags, + source={"mediaType": "text/markdown", "content": source}, + inReplyTo=None, + **choices, + ) + + question = ap.Question(**raw_question) + create = question.build_create() + create_id = post_to_outbox(create) + + Tasks.update_question_outbox(create_id, open_for) + + return _user_api_response(activity=create_id) + + +@blueprint.route("/block", methods=["POST"]) +@api_required +def api_block() -> _Response: + actor = _user_api_arg("actor") + + existing = DB.activities.find_one( + { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.BLOCK.value, + "activity.object": actor, + "meta.undo": False, + } + ) + if existing: + return _user_api_response(activity=existing["activity"]["id"]) + + block = ap.Block(actor=MY_PERSON.id, object=actor) + block_id = post_to_outbox(block) + + return _user_api_response(activity=block_id) + + +@blueprint.route("/follow", methods=["POST"]) +@api_required +def api_follow() -> _Response: + actor = _user_api_arg("actor") + + q = { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + "activity.object": actor, + } + + existing = DB.activities.find_one(q) + if existing: + return _user_api_response(activity=existing["activity"]["id"]) + + follow = ap.Follow( + actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now() + ) + follow_id = post_to_outbox(follow) + + return _user_api_response(activity=follow_id) + + +@blueprint.route("/debug", methods=["GET", "DELETE"]) +@api_required +def api_debug() -> _Response: + """Endpoint used/needed for testing, only works in DEBUG_MODE.""" + if not DEBUG_MODE: + return flask.jsonify(message="DEBUG_MODE is off") + + if request.method == "DELETE": + _drop_db() + return flask.jsonify(message="DB dropped") + + return flask.jsonify( + inbox=DB.activities.count({"box": Box.INBOX.value}), + outbox=DB.activities.count({"box": Box.OUTBOX.value}), + outbox_data=without_id(DB.activities.find({"box": Box.OUTBOX.value})), + ) + + +@blueprint.route("/stream") +@api_required +def api_stream() -> _Response: + return Response( + response=json.dumps( + activitypub.build_inbox_json_feed("/api/stream", request.args.get("cursor")) + ), + headers={"Content-Type": "application/json"}, + ) diff --git a/blueprints/tasks.py b/blueprints/tasks.py new file mode 100644 index 0000000..cff52a0 --- /dev/null +++ b/blueprints/tasks.py @@ -0,0 +1,496 @@ +import json +import traceback +from datetime import datetime +from datetime import timezone + +import flask +import requests +from flask import current_app as app +from little_boxes import activitypub as ap +from little_boxes.errors import ActivityGoneError +from little_boxes.errors import ActivityNotFoundError +from little_boxes.errors import NotAnActivityError +from little_boxes.httpsig import HTTPSigAuth +from requests.exceptions import HTTPError + +import activity_gc +import activitypub +import config +from activitypub import Box +from app_utils import MY_PERSON +from app_utils import _add_answers_to_question +from app_utils import back +from app_utils import p +from app_utils import post_to_outbox +from config import DB +from tasks import Tasks +from utils import now +from utils import opengraph +from utils.meta import MetaKey +from utils.meta import _meta +from utils.notifications import set_inbox_flags + +SIG_AUTH = HTTPSigAuth(config.KEY) + +blueprint = flask.Blueprint("tasks", __name__) + + +class TaskError(Exception): + """Raised to log the error for poussetaches.""" + + def __init__(self): + self.message = traceback.format_exc() + + +@blueprint.route("/task/update_question", methods=["POST"]) +def task_update_question(): + """Sends an Update.""" + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + app.logger.info(f"Updating question {iri}") + cc = [config.ID + "/followers"] + doc = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri}) + _add_answers_to_question(doc) + question = ap.Question(**doc["activity"]["object"]) + + raw_update = dict( + actor=question.id, + object=question.to_dict(embed=True), + attributedTo=MY_PERSON.id, + cc=list(set(cc)), + to=[ap.AS_PUBLIC], + ) + raw_update["@context"] = config.DEFAULT_CTX + + update = ap.Update(**raw_update) + print(update) + print(update.to_dict()) + post_to_outbox(update) + + except HTTPError as err: + app.logger.exception("request failed") + if 400 >= err.response.status_code >= 499: + app.logger.info("client error, no retry") + return "" + + raise TaskError() from err + except Exception as err: + app.logger.exception("task failed") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/fetch_og_meta", methods=["POST"]) +def task_fetch_og_meta(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + if activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + links = opengraph.links_from_note(note.to_dict()) + og_metadata = opengraph.fetch_og_metadata(config.USER_AGENT, links) + for og in og_metadata: + if not og.get("image"): + continue + config.MEDIA_CACHE.cache_og_image(og["image"], iri) + + app.logger.debug(f"OG metadata {og_metadata!r}") + DB.activities.update_one( + {"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}} + ) + + app.logger.info(f"OG metadata fetched for {iri}: {og_metadata}") + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"dropping activity {iri}, skip OG metedata") + return "" + except requests.exceptions.HTTPError as http_err: + if 400 <= http_err.response.status_code < 500: + app.logger.exception("bad request, no retry") + return "" + app.logger.exception("failed to fetch OG metadata") + raise TaskError() from http_err + except Exception as err: + app.logger.exception(f"failed to fetch OG metadata for {iri}") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/cache_object", methods=["POST"]) +def task_cache_object(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + obj = activity.get_object() + DB.activities.update_one( + {"remote_id": activity.id}, + { + "$set": { + "meta.object": obj.to_dict(embed=True), + "meta.object_actor": activitypub._actor_to_meta(obj.get_actor()), + } + }, + ) + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) + app.logger.exception(f"flagging activity {iri} as deleted, no object caching") + except Exception as err: + app.logger.exception(f"failed to cache object for {iri}") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 +def task_finish_post_to_outbox(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + recipients = activity.recipients() + + if activity.has_type(ap.ActivityType.DELETE): + back.outbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.outbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.outbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.outbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.outbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.outbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_following(MY_PERSON, obj) + + app.logger.info(f"recipients={recipients}") + activity = ap.clean_activity(activity.to_dict()) + + payload = json.dumps(activity) + for recp in recipients: + app.logger.debug(f"posting to {recp}") + Tasks.post_to_remote_inbox(payload, recp) + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"no retry") + except Exception as err: + app.logger.exception(f"failed to post to remote inbox for {iri}") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901 +def task_finish_post_to_inbox(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + if activity.has_type(ap.ActivityType.DELETE): + back.inbox_delete(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.UPDATE): + back.inbox_update(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.CREATE): + back.inbox_create(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_announce(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.LIKE): + back.inbox_like(MY_PERSON, activity) + elif activity.has_type(ap.ActivityType.FOLLOW): + # Reply to a Follow with an Accept + actor_id = activity.get_actor().id + accept = ap.Accept( + actor=config.ID, + object={ + "type": "Follow", + "id": activity.id, + "object": activity.get_object_id(), + "actor": actor_id, + }, + to=[actor_id], + published=now(), + ) + post_to_outbox(accept) + elif activity.has_type(ap.ActivityType.UNDO): + obj = activity.get_object() + if obj.has_type(ap.ActivityType.LIKE): + back.inbox_undo_like(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.ANNOUNCE): + back.inbox_undo_announce(MY_PERSON, obj) + elif obj.has_type(ap.ActivityType.FOLLOW): + back.undo_new_follower(MY_PERSON, obj) + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + app.logger.exception(f"no retry") + except Exception as err: + app.logger.exception(f"failed to cache attachments for {iri}") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/cache_attachments", methods=["POST"]) +def task_cache_attachments(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + # Generates thumbnails for the actor's icon and the attachments if any + + obj = activity.get_object() + + # Iter the attachments + for attachment in obj._data.get("attachment", []): + try: + config.MEDIA_CACHE.cache_attachment(attachment, iri) + except ValueError: + app.logger.exception(f"failed to cache {attachment}") + + app.logger.info(f"attachments cached for {iri}") + + except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): + app.logger.exception(f"dropping activity {iri}, no attachment caching") + except Exception as err: + app.logger.exception(f"failed to cache attachments for {iri}") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/cache_actor", methods=["POST"]) +def task_cache_actor() -> str: + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload["iri"] + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + # Fetch the Open Grah metadata if it's a `Create` + if activity.has_type(ap.ActivityType.CREATE): + Tasks.fetch_og_meta(iri) + + actor = activity.get_actor() + if actor.icon: + if isinstance(actor.icon, dict) and "url" in actor.icon: + config.MEDIA_CACHE.cache_actor_icon(actor.icon["url"]) + else: + app.logger.warning(f"failed to parse icon {actor.icon} for {iri}") + + if activity.has_type(ap.ActivityType.FOLLOW): + if actor.id == config.ID: + # It's a new following, cache the "object" (which is the actor we follow) + DB.activities.update_one( + {"remote_id": iri}, + { + "$set": { + "meta.object": activity.get_object().to_dict(embed=True) + } + }, + ) + + # Cache the actor info + DB.activities.update_one( + {"remote_id": iri}, {"$set": {"meta.actor": actor.to_dict(embed=True)}} + ) + + app.logger.info(f"actor cached for {iri}") + if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): + Tasks.cache_attachments(iri) + + except (ActivityGoneError, ActivityNotFoundError): + DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}}) + app.logger.exception(f"flagging activity {iri} as deleted, no actor caching") + except Exception as err: + app.logger.exception(f"failed to cache actor for {iri}") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/forward_activity", methods=["POST"]) +def task_forward_activity(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + recipients = back.followers_as_recipients() + app.logger.debug(f"Forwarding {activity!r} to {recipients}") + activity = ap.clean_activity(activity.to_dict()) + payload = json.dumps(activity) + for recp in recipients: + app.logger.debug(f"forwarding {activity!r} to {recp}") + Tasks.post_to_remote_inbox(payload, recp) + except Exception as err: + app.logger.exception("task failed") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/post_to_remote_inbox", methods=["POST"]) +def task_post_to_remote_inbox(): + """Post an activity to a remote inbox.""" + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + payload, to = task.payload["payload"], task.payload["to"] + try: + app.logger.info("payload=%s", payload) + app.logger.info("generating sig") + signed_payload = json.loads(payload) + + # XXX Disable JSON-LD signature crap for now (as HTTP signatures are enough for most implementations) + # Don't overwrite the signature if we're forwarding an activity + # if "signature" not in signed_payload: + # generate_signature(signed_payload, KEY) + + app.logger.info("to=%s", to) + resp = requests.post( + to, + data=json.dumps(signed_payload), + auth=SIG_AUTH, + headers={ + "Content-Type": config.HEADERS[1], + "Accept": config.HEADERS[1], + "User-Agent": config.USER_AGENT, + }, + ) + app.logger.info("resp=%s", resp) + app.logger.info("resp_body=%s", resp.text) + resp.raise_for_status() + except HTTPError as err: + app.logger.exception("request failed") + if 400 >= err.response.status_code >= 499: + app.logger.info("client error, no retry") + return "" + + raise TaskError() from err + except Exception as err: + app.logger.exception("task failed") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/fetch_remote_question", methods=["POST"]) +def task_fetch_remote_question(): + """Fetch a remote question for implementation that does not send Update.""" + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + app.logger.info(f"Fetching remote question {iri}") + local_question = DB.activities.find_one( + { + "box": Box.INBOX.value, + "type": ap.ActivityType.CREATE.value, + "activity.object.id": iri, + } + ) + remote_question = ap.get_backend().fetch_iri(iri, no_cache=True) + # FIXME(tsileo): compute and set `meta.object_visiblity` (also update utils.py to do it) + if ( + local_question + and ( + local_question["meta"].get("voted_for") + or local_question["meta"].get("subscribed") + ) + and not DB.notifications.find_one({"activity.id": remote_question["id"]}) + ): + DB.notifications.insert_one( + { + "type": "question_ended", + "datetime": datetime.now(timezone.utc).isoformat(), + "activity": remote_question, + } + ) + + # Update the Create if we received it in the inbox + if local_question: + DB.activities.update_one( + {"remote_id": local_question["remote_id"], "box": Box.INBOX.value}, + {"$set": {"activity.object": remote_question}}, + ) + + # Also update all the cached copies (Like, Announce...) + DB.activities.update_many( + {"meta.object.id": remote_question["id"]}, + {"$set": {"meta.object": remote_question}}, + ) + + except HTTPError as err: + app.logger.exception("request failed") + if 400 >= err.response.status_code >= 499: + app.logger.info("client error, no retry") + return "" + + raise TaskError() from err + except Exception as err: + app.logger.exception("task failed") + raise TaskError() from err + + return "" + + +@blueprint.route("/task/cleanup", methods=["POST"]) +def task_cleanup(): + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + activity_gc.perform() + return "" + + +@blueprint.route("/task/process_new_activity", methods=["POST"]) # noqa:c901 +def task_process_new_activity(): + """Process an activity received in the inbox""" + task = p.parse(flask.request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + activity = ap.fetch_remote_activity(iri) + app.logger.info(f"activity={activity!r}") + + flags = {} + + if not activity.published: + flags[_meta(MetaKey.PUBLISHED)] = now() + else: + flags[_meta(MetaKey.PUBLISHED)] = activity.published + + set_inbox_flags(activity, flags) + app.logger.info(f"a={activity}, flags={flags!r}") + if flags: + DB.activities.update_one({"remote_id": activity.id}, {"$set": flags}) + + app.logger.info(f"new activity {iri} processed") + if not activity.has_type(ap.ActivityType.DELETE): + Tasks.cache_actor(iri) + except (ActivityGoneError, ActivityNotFoundError): + app.logger.exception(f"dropping activity {iri}, skip processing") + return "" + except Exception as err: + app.logger.exception(f"failed to process new activity {iri}") + raise TaskError() from err + + return "" diff --git a/blueprints/well_known.py b/blueprints/well_known.py new file mode 100644 index 0000000..e8409a8 --- /dev/null +++ b/blueprints/well_known.py @@ -0,0 +1,101 @@ +import json +import mimetypes + +import flask +from flask import Response +from flask import abort +from flask import request +from little_boxes import activitypub as ap + +import config +from activitypub import Box +from app_utils import _Response +from config import DB + +blueprint = flask.Blueprint("well_known", __name__) + + +@blueprint.route("/.well-known/webfinger") +def wellknown_webfinger() -> _Response: + """Exposes/servers WebFinger data.""" + resource = request.args.get("resource") + if resource not in [f"acct:{config.USERNAME}@{config.DOMAIN}", config.ID]: + abort(404) + + out = { + "subject": f"acct:{config.USERNAME}@{config.DOMAIN}", + "aliases": [config.ID], + "links": [ + { + "rel": "http://webfinger.net/rel/profile-page", + "type": "text/html", + "href": config.ID, + }, + {"rel": "self", "type": "application/activity+json", "href": config.ID}, + { + "rel": "http://ostatus.org/schema/1.0/subscribe", + "template": config.BASE_URL + "/authorize_follow?profile={uri}", + }, + {"rel": "magic-public-key", "href": config.KEY.to_magic_key()}, + { + "href": config.ICON_URL, + "rel": "http://webfinger.net/rel/avatar", + "type": mimetypes.guess_type(config.ICON_URL)[0], + }, + ], + } + + return Response( + response=json.dumps(out), + headers={"Content-Type": "application/jrd+json; charset=utf-8"}, + ) + + +@blueprint.route("/.well-known/nodeinfo") +def wellknown_nodeinfo() -> _Response: + """Exposes the NodeInfo endpoint (http://nodeinfo.diaspora.software/).""" + return flask.jsonify( + links=[ + { + "rel": "http://nodeinfo.diaspora.software/ns/schema/2.1", + "href": f"{config.ID}/nodeinfo", + } + ] + ) + + +@blueprint.route("/nodeinfo") +def nodeinfo() -> _Response: + """NodeInfo endpoint.""" + q = { + "box": Box.OUTBOX.value, + "meta.deleted": False, + "type": {"$in": [ap.ActivityType.CREATE.value, ap.ActivityType.ANNOUNCE.value]}, + } + + response = json.dumps( + { + "version": "2.1", + "software": { + "name": "microblogpub", + "version": config.VERSION, + "repository": "https://github.com/tsileo/microblog.pub", + }, + "protocols": ["activitypub"], + "services": {"inbound": [], "outbound": []}, + "openRegistrations": False, + "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, + "metadata": { + "nodeName": f"@{config.USERNAME}@{config.DOMAIN}", + "version": config.VERSION, + "versionDate": config.VERSION_DATE, + }, + } + ) + + return Response( + headers={ + "Content-Type": "application/json; profile=http://nodeinfo.diaspora.software/ns/schema/2.1#" + }, + response=response, + ) diff --git a/docker-compose.yml b/docker-compose.yml index 2be0c57..6889dce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,7 +18,7 @@ services: volumes: - "${DATA_DIR}/mongodb:/data/db" poussetaches: - image: "poussetaches:latest" + image: "poussetaches/poussetaches:latest" volumes: - "${DATA_DIR}/poussetaches:/app/poussetaches_data" environment: diff --git a/poussetaches.py b/poussetaches.py deleted file mode 100644 index aa6e6eb..0000000 --- a/poussetaches.py +++ /dev/null @@ -1,132 +0,0 @@ -import base64 -import json -import os -from dataclasses import dataclass -from datetime import datetime -from typing import Any -from typing import Dict -from typing import List - -import flask -import requests - -POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY") - - -@dataclass -class Task: - req_id: str - tries: int - - payload: Any - - -@dataclass -class GetTask: - payload: Any - expected: int - schedule: str - task_id: str - next_run: datetime - tries: int - url: str - last_error_status_code: int - last_error_body: str - - -class PousseTaches: - def __init__(self, api_url: str, base_url: str) -> None: - self.api_url = api_url - self.base_url = base_url - - def push( - self, - payload: Any, - path: str, - expected: int = 200, - schedule: str = "", - delay: int = 0, - ) -> str: - # Encode our payload - p = base64.b64encode(json.dumps(payload).encode()).decode() - - # Queue/push it - resp = requests.post( - self.api_url, - json={ - "url": self.base_url + path, - "payload": p, - "expected": expected, - "schedule": schedule, - "delay": delay, - }, - ) - resp.raise_for_status() - - return resp.headers["Poussetaches-Task-ID"] - - def parse(self, req: flask.Request) -> Task: - if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY: - raise ValueError("Bad auth key") - - # Parse the "envelope" - envelope = json.loads(req.data) - print(req) - print(f"envelope={envelope!r}") - payload = json.loads(base64.b64decode(envelope["payload"])) - - return Task( - req_id=envelope["req_id"], tries=envelope["tries"], payload=payload - ) # type: ignore - - @staticmethod - def _expand_task(t: Dict[str, Any]) -> None: - try: - t["payload"] = json.loads(base64.b64decode(t["payload"])) - except json.JSONDecodeError: - t["payload"] = base64.b64decode(t["payload"]).decode() - - if t["last_error_body"]: - t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode() - - t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9)) - if t["last_run"]: - t["last_run"] = datetime.fromtimestamp(float(t["last_run"] / 1e9)) - else: - del t["last_run"] - - def _get(self, where: str) -> List[GetTask]: - out = [] - - resp = requests.get(self.api_url + f"/{where}") - resp.raise_for_status() - dat = resp.json() - for t in dat["tasks"]: - self._expand_task(t) - out.append( - GetTask( - task_id=t["id"], - payload=t["payload"], - expected=t["expected"], - schedule=t["schedule"], - tries=t["tries"], - url=t["url"], - last_error_status_code=t["last_error_status_code"], - last_error_body=t["last_error_body"], - next_run=t["next_run"], - ) - ) - - return out - - def get_cron(self) -> List[GetTask]: - return self._get("cron") - - def get_success(self) -> List[GetTask]: - return self._get("success") - - def get_waiting(self) -> List[GetTask]: - return self._get("waiting") - - def get_dead(self) -> List[GetTask]: - return self._get("dead") diff --git a/requirements.txt b/requirements.txt index 2d749d2..ebdac15 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +poussetaches python-dateutil libsass tornado<6.0.0 diff --git a/tasks.py b/tasks.py index 9515fe0..3f90f10 100644 --- a/tasks.py +++ b/tasks.py @@ -3,6 +3,7 @@ from datetime import datetime from datetime import timezone from poussetaches import PousseTaches + from utils import parse_datetime p = PousseTaches( diff --git a/templates/indieauth_flow.html b/templates/indieauth_flow.html index 7a9e313..aae64fc 100644 --- a/templates/indieauth_flow.html +++ b/templates/indieauth_flow.html @@ -20,7 +20,7 @@ -
+ {% if scopes %}

Scopes

    diff --git a/templates/layout.html b/templates/layout.html index c1865f5..63a4600 100644 --- a/templates/layout.html +++ b/templates/layout.html @@ -31,7 +31,7 @@ {% if unread_notifications_count %} ({{unread_notifications_count}}) {% endif %} -
  • Lists
  • +
  • Lists
  • Bookmarks
  • Lookup
  • Logout
  • diff --git a/templates/lists.html b/templates/lists.html index 3fa80cf..106323c 100644 --- a/templates/lists.html +++ b/templates/lists.html @@ -12,7 +12,7 @@

    Lists and its members are private.

    New List

    - + @@ -23,13 +23,13 @@

    Manage lists

    {% for l in lists %} -

    {{ l.name }} {{ l.members | length }} members

    +

    {{ l.name }} {{ l.members | length }} members

    diff --git a/templates/stream.html b/templates/stream.html index 178a190..eccb207 100644 --- a/templates/stream.html +++ b/templates/stream.html @@ -1,12 +1,12 @@ {% extends "layout.html" %} {% import 'utils.html' as utils %} -{% block title %}{% if request.path == url_for('admin_stream') %}Stream{% else %}Notifications{% endif %} - {{ config.NAME }}{% endblock %} +{% block title %}{% if request.path == url_for('admin.admin_stream') %}Stream{% else %}Notifications{% endif %} - {{ config.NAME }}{% endblock %} {% block content %}
    {% include "header.html" %}
    -{% if request.path == url_for('admin_notifications') and unread_notifications_count %} +{% if request.path == url_for('admin.admin_notifications') and unread_notifications_count %}
    @@ -28,7 +28,7 @@ {% if boost_actor %}
    {{ boost_actor.name or boost_actor.preferredUsername }} boosted - {% if request.path == url_for('admin_notifications') %} + {% if request.path == url_for('admin.admin_notifications') %} {% if item.meta.notification_unread %}new{% endif %} {{ (item.activity.published or item.meta.published) | format_timeago }} {% endif %} diff --git a/templates/utils.html b/templates/utils.html index b02e014..057c395 100644 --- a/templates/utils.html +++ b/templates/utils.html @@ -213,7 +213,7 @@ {% endif %} -{% if meta.bookmarked or request.path == url_for("admin_bookmarks") %} +{% if meta.bookmarked or request.path == url_for("admin.admin_bookmarks") %}