mirror of
https://git.sr.ht/~tsileo/microblog.pub
synced 2024-11-15 03:04:28 +00:00
Big cleanup part 3 (#59)
* Remove dead code and re-organize * Switch to new queries helper
This commit is contained in:
parent
a21121308f
commit
f902868250
11 changed files with 491 additions and 737 deletions
165
app.py
165
app.py
|
@ -3,8 +3,6 @@ import logging
|
|||
import os
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
|
@ -25,9 +23,7 @@ from little_boxes.activitypub import get_backend
|
|||
from little_boxes.errors import ActivityGoneError
|
||||
from little_boxes.errors import Error
|
||||
from little_boxes.httpsig import verify_request
|
||||
from little_boxes.webfinger import get_actor_url
|
||||
from little_boxes.webfinger import get_remote_follow_template
|
||||
from u2flib_server import u2f
|
||||
|
||||
import blueprints.admin
|
||||
import blueprints.indieauth
|
||||
|
@ -37,13 +33,17 @@ import config
|
|||
from blueprints.api import _api_required
|
||||
from blueprints.tasks import TaskError
|
||||
from config import DB
|
||||
from config import HEADERS
|
||||
from config import ID
|
||||
from config import ME
|
||||
from config import MEDIA_CACHE
|
||||
from config import VERSION
|
||||
from core import activitypub
|
||||
from core.activitypub import embed_collection
|
||||
from core import feed
|
||||
from core.activitypub import activity_from_doc
|
||||
from core.activitypub import activity_url
|
||||
from core.activitypub import post_to_inbox
|
||||
from core.activitypub import post_to_outbox
|
||||
from core.activitypub import remove_context
|
||||
from core.db import find_one_activity
|
||||
from core.meta import Box
|
||||
from core.meta import MetaKey
|
||||
|
@ -51,19 +51,14 @@ from core.meta import _meta
|
|||
from core.meta import by_remote_id
|
||||
from core.meta import in_outbox
|
||||
from core.meta import is_public
|
||||
from core.shared import MY_PERSON
|
||||
from core.shared import _add_answers_to_question
|
||||
from core.shared import _build_thread
|
||||
from core.shared import _get_ip
|
||||
from core.shared import activity_url
|
||||
from core.shared import back
|
||||
from core.shared import csrf
|
||||
from core.shared import is_api_request
|
||||
from core.shared import jsonify
|
||||
from core.shared import login_required
|
||||
from core.shared import noindex
|
||||
from core.shared import paginated_query
|
||||
from core.shared import post_to_outbox
|
||||
from core.tasks import Tasks
|
||||
from utils import now
|
||||
from utils.key import get_secret_key
|
||||
from utils.template_filters import filters
|
||||
|
||||
|
@ -164,29 +159,6 @@ def set_x_powered_by(response):
|
|||
return response
|
||||
|
||||
|
||||
def jsonify(**data):
|
||||
if "@context" not in data:
|
||||
data["@context"] = config.DEFAULT_CTX
|
||||
return Response(
|
||||
response=json.dumps(data),
|
||||
headers={
|
||||
"Content-Type": "application/json"
|
||||
if app.debug
|
||||
else "application/activity+json"
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def is_api_request():
|
||||
h = request.headers.get("Accept")
|
||||
if h is None:
|
||||
return False
|
||||
h = h.split(",")[0]
|
||||
if h in HEADERS or h == "application/json":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@app.errorhandler(ValueError)
|
||||
def handle_value_error(error):
|
||||
logger.error(
|
||||
|
@ -271,12 +243,9 @@ def serve_uploads(oid, fname):
|
|||
return resp
|
||||
|
||||
|
||||
#######
|
||||
# Login
|
||||
|
||||
|
||||
@app.route("/remote_follow", methods=["GET", "POST"])
|
||||
def remote_follow():
|
||||
"""Form to allow visitor to perform the remote follow dance."""
|
||||
if request.method == "GET":
|
||||
return render_template("remote_follow.html")
|
||||
|
||||
|
@ -287,59 +256,8 @@ def remote_follow():
|
|||
return redirect(get_remote_follow_template(profile).format(uri=ID))
|
||||
|
||||
|
||||
@app.route("/authorize_follow", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def authorize_follow():
|
||||
if request.method == "GET":
|
||||
return render_template(
|
||||
"authorize_remote_follow.html", profile=request.args.get("profile")
|
||||
)
|
||||
|
||||
actor = get_actor_url(request.form.get("profile"))
|
||||
if not actor:
|
||||
abort(500)
|
||||
|
||||
q = {
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
"activity.object": actor,
|
||||
}
|
||||
if DB.activities.count(q) > 0:
|
||||
return redirect("/following")
|
||||
|
||||
follow = ap.Follow(
|
||||
actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now()
|
||||
)
|
||||
post_to_outbox(follow)
|
||||
|
||||
return redirect("/following")
|
||||
|
||||
|
||||
@app.route("/u2f/register", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def u2f_register():
|
||||
# TODO(tsileo): ensure no duplicates
|
||||
if request.method == "GET":
|
||||
payload = u2f.begin_registration(ID)
|
||||
session["challenge"] = payload
|
||||
return render_template("u2f.html", payload=payload)
|
||||
else:
|
||||
resp = json.loads(request.form.get("resp"))
|
||||
device, device_cert = u2f.complete_registration(session["challenge"], resp)
|
||||
session["challenge"] = None
|
||||
DB.u2f.insert_one({"device": device, "cert": device_cert})
|
||||
session["logged_in"] = False
|
||||
return redirect("/login")
|
||||
|
||||
|
||||
#######
|
||||
# Activity pub routes
|
||||
@app.route("/drop_cache")
|
||||
@login_required
|
||||
def drop_cache():
|
||||
DB.actors.drop()
|
||||
return "Done"
|
||||
|
||||
|
||||
@app.route("/")
|
||||
|
@ -469,44 +387,6 @@ def note_by_id(note_id):
|
|||
)
|
||||
|
||||
|
||||
def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if raw_doc["activity"]["type"] != ActivityType.CREATE.value:
|
||||
return raw_doc
|
||||
|
||||
raw_doc["activity"]["object"]["replies"] = embed_collection(
|
||||
raw_doc.get("meta", {}).get("count_direct_reply", 0),
|
||||
f'{raw_doc["remote_id"]}/replies',
|
||||
)
|
||||
|
||||
raw_doc["activity"]["object"]["likes"] = embed_collection(
|
||||
raw_doc.get("meta", {}).get("count_like", 0), f'{raw_doc["remote_id"]}/likes'
|
||||
)
|
||||
|
||||
raw_doc["activity"]["object"]["shares"] = embed_collection(
|
||||
raw_doc.get("meta", {}).get("count_boost", 0), f'{raw_doc["remote_id"]}/shares'
|
||||
)
|
||||
|
||||
return raw_doc
|
||||
|
||||
|
||||
def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if "@context" in activity:
|
||||
del activity["@context"]
|
||||
return activity
|
||||
|
||||
|
||||
def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]:
|
||||
raw_doc = add_extra_collection(raw_doc)
|
||||
activity = clean_activity(raw_doc["activity"])
|
||||
|
||||
# Handle Questions
|
||||
# TODO(tsileo): what about object embedded by ID/URL?
|
||||
_add_answers_to_question(raw_doc)
|
||||
if embed:
|
||||
return remove_context(activity)
|
||||
return activity
|
||||
|
||||
|
||||
@app.route("/outbox", methods=["GET", "POST"])
|
||||
def outbox():
|
||||
if request.method == "GET":
|
||||
|
@ -987,7 +867,7 @@ def liked():
|
|||
@app.route("/feed.json")
|
||||
def json_feed():
|
||||
return Response(
|
||||
response=json.dumps(activitypub.json_feed("/feed.json")),
|
||||
response=json.dumps(feed.json_feed("/feed.json")),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
|
@ -995,7 +875,7 @@ def json_feed():
|
|||
@app.route("/feed.atom")
|
||||
def atom_feed():
|
||||
return Response(
|
||||
response=activitypub.gen_feed().atom_str(),
|
||||
response=feed.gen_feed().atom_str(),
|
||||
headers={"Content-Type": "application/atom+xml"},
|
||||
)
|
||||
|
||||
|
@ -1003,27 +883,6 @@ def atom_feed():
|
|||
@app.route("/feed.rss")
|
||||
def rss_feed():
|
||||
return Response(
|
||||
response=activitypub.gen_feed().rss_str(),
|
||||
response=feed.gen_feed().rss_str(),
|
||||
headers={"Content-Type": "application/rss+xml"},
|
||||
)
|
||||
|
||||
|
||||
def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||
# Check for Block activity
|
||||
actor = activity.get_actor()
|
||||
if back.outbox_is_blocked(MY_PERSON, actor.id):
|
||||
app.logger.info(
|
||||
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
|
||||
)
|
||||
return
|
||||
|
||||
if back.inbox_check_duplicate(MY_PERSON, activity.id):
|
||||
# The activity is already in the inbox
|
||||
app.logger.info(f"received duplicate activity {activity!r}, dropping it")
|
||||
return
|
||||
|
||||
back.save(Box.INBOX, activity)
|
||||
Tasks.process_new_activity(activity.id)
|
||||
|
||||
app.logger.info(f"spawning task for {activity!r}")
|
||||
Tasks.finish_post_to_inbox(activity.id)
|
||||
|
|
|
@ -15,6 +15,7 @@ from flask import request
|
|||
from flask import session
|
||||
from flask import url_for
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes.webfinger import get_actor_url
|
||||
from passlib.hash import bcrypt
|
||||
from u2flib_server import u2f
|
||||
|
||||
|
@ -23,6 +24,7 @@ from config import DB
|
|||
from config import ID
|
||||
from config import PASS
|
||||
from core.activitypub import Box
|
||||
from core.activitypub import post_to_outbox
|
||||
from core.shared import MY_PERSON
|
||||
from core.shared import _build_thread
|
||||
from core.shared import _Response
|
||||
|
@ -31,7 +33,6 @@ from core.shared import login_required
|
|||
from core.shared import noindex
|
||||
from core.shared import p
|
||||
from core.shared import paginated_query
|
||||
from core.shared import post_to_outbox
|
||||
from utils import now
|
||||
from utils.lookup import lookup
|
||||
|
||||
|
@ -412,3 +413,49 @@ def admin_bookmarks() -> _Response:
|
|||
return render_template(
|
||||
tpl, inbox_data=inbox_data, older_than=older_than, newer_than=newer_than
|
||||
)
|
||||
|
||||
|
||||
@blueprint.route("/u2f/register", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def u2f_register():
|
||||
# TODO(tsileo): ensure no duplicates
|
||||
if request.method == "GET":
|
||||
payload = u2f.begin_registration(ID)
|
||||
session["challenge"] = payload
|
||||
return render_template("u2f.html", payload=payload)
|
||||
else:
|
||||
resp = json.loads(request.form.get("resp"))
|
||||
device, device_cert = u2f.complete_registration(session["challenge"], resp)
|
||||
session["challenge"] = None
|
||||
DB.u2f.insert_one({"device": device, "cert": device_cert})
|
||||
session["logged_in"] = False
|
||||
return redirect("/login")
|
||||
|
||||
|
||||
@blueprint.route("/authorize_follow", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def authorize_follow():
|
||||
if request.method == "GET":
|
||||
return render_template(
|
||||
"authorize_remote_follow.html", profile=request.args.get("profile")
|
||||
)
|
||||
|
||||
actor = get_actor_url(request.form.get("profile"))
|
||||
if not actor:
|
||||
abort(500)
|
||||
|
||||
q = {
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ap.ActivityType.FOLLOW.value,
|
||||
"meta.undo": False,
|
||||
"activity.object": actor,
|
||||
}
|
||||
if DB.activities.count(q) > 0:
|
||||
return redirect("/following")
|
||||
|
||||
follow = ap.Follow(
|
||||
actor=MY_PERSON.id, object=actor, to=[actor], cc=[ap.AS_PUBLIC], published=now()
|
||||
)
|
||||
post_to_outbox(follow)
|
||||
|
||||
return redirect("/following")
|
||||
|
|
|
@ -32,16 +32,16 @@ from config import ID
|
|||
from config import JWT
|
||||
from config import MEDIA_CACHE
|
||||
from config import _drop_db
|
||||
from core import activitypub
|
||||
from core import feed
|
||||
from core.activitypub import activity_url
|
||||
from core.activitypub import post_to_outbox
|
||||
from core.meta import Box
|
||||
from core.meta import MetaKey
|
||||
from core.meta import _meta
|
||||
from core.shared import MY_PERSON
|
||||
from core.shared import _Response
|
||||
from core.shared import activity_url
|
||||
from core.shared import csrf
|
||||
from core.shared import login_required
|
||||
from core.shared import post_to_outbox
|
||||
from core.tasks import Tasks
|
||||
from utils import now
|
||||
|
||||
|
@ -587,7 +587,7 @@ def api_debug() -> _Response:
|
|||
def api_stream() -> _Response:
|
||||
return Response(
|
||||
response=json.dumps(
|
||||
activitypub.build_inbox_json_feed("/api/stream", request.args.get("cursor"))
|
||||
feed.build_inbox_json_feed("/api/stream", request.args.get("cursor"))
|
||||
),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
|
|
@ -17,17 +17,17 @@ import config
|
|||
from config import DB
|
||||
from core import gc
|
||||
from core.activitypub import Box
|
||||
from core.activitypub import _add_answers_to_question
|
||||
from core.activitypub import post_to_outbox
|
||||
from core.inbox import process_inbox
|
||||
from core.meta import MetaKey
|
||||
from core.meta import _meta
|
||||
from core.notifications import set_inbox_flags
|
||||
from core.outbox import process_outbox
|
||||
from core.shared import MY_PERSON
|
||||
from core.shared import _add_answers_to_question
|
||||
from core.shared import _Response
|
||||
from core.shared import back
|
||||
from core.shared import p
|
||||
from core.shared import post_to_outbox
|
||||
from core.tasks import Tasks
|
||||
from utils import now
|
||||
from utils import opengraph
|
||||
|
|
|
@ -1,24 +1,26 @@
|
|||
import binascii
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from urllib.parse import urljoin
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from bson.objectid import ObjectId
|
||||
from cachetools import LRUCache
|
||||
from feedgen.feed import FeedGenerator
|
||||
from html2text import html2text
|
||||
from flask import url_for
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes import strtobool
|
||||
from little_boxes.activitypub import _to_list
|
||||
from little_boxes.activitypub import clean_activity
|
||||
from little_boxes.activitypub import format_datetime
|
||||
from little_boxes.backend import Backend
|
||||
from little_boxes.errors import ActivityGoneError
|
||||
from little_boxes.errors import Error
|
||||
from little_boxes.errors import NotAnActivityError
|
||||
|
||||
from config import BASE_URL
|
||||
from config import DB
|
||||
|
@ -26,7 +28,6 @@ from config import EXTRA_INBOXES
|
|||
from config import ID
|
||||
from config import ME
|
||||
from config import USER_AGENT
|
||||
from config import USERNAME
|
||||
from core.meta import Box
|
||||
from core.tasks import Tasks
|
||||
|
||||
|
@ -39,26 +40,6 @@ ACTORS_CACHE = LRUCache(maxsize=256)
|
|||
MY_PERSON = ap.Person(**ME)
|
||||
|
||||
|
||||
def _actor_to_meta(actor: ap.BaseActivity, with_inbox=False) -> Dict[str, Any]:
|
||||
meta = {
|
||||
"id": actor.id,
|
||||
"url": actor.url,
|
||||
"icon": actor.icon,
|
||||
"name": actor.name,
|
||||
"preferredUsername": actor.preferredUsername,
|
||||
}
|
||||
if with_inbox:
|
||||
meta.update(
|
||||
{
|
||||
"inbox": actor.inbox,
|
||||
"sharedInbox": actor._data.get("endpoints", {}).get("sharedInbox"),
|
||||
}
|
||||
)
|
||||
logger.debug(f"meta={meta}")
|
||||
|
||||
return meta
|
||||
|
||||
|
||||
def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
|
||||
"""Helper for removing MongoDB's `_id` field."""
|
||||
doc = doc.copy()
|
||||
|
@ -67,17 +48,6 @@ def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
|
|||
return doc
|
||||
|
||||
|
||||
def ensure_it_is_me(f):
|
||||
"""Method decorator used to track the events fired during tests."""
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
if args[1].id != ME["id"]:
|
||||
raise Error("unexpected actor")
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def _answer_key(choice: str) -> str:
|
||||
h = hashlib.new("sha1")
|
||||
h.update(choice.encode())
|
||||
|
@ -96,23 +66,7 @@ def _is_local_reply(create: ap.Create) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
class MicroblogPubBackend(Backend):
|
||||
"""Implements a Little Boxes backend, backed by MongoDB."""
|
||||
|
||||
def base_url(self) -> str:
|
||||
return BASE_URL
|
||||
|
||||
def debug_mode(self) -> bool:
|
||||
return strtobool(os.getenv("MICROBLOGPUB_DEBUG", "false"))
|
||||
|
||||
def user_agent(self) -> str:
|
||||
"""Setup a custom user agent."""
|
||||
return USER_AGENT
|
||||
|
||||
def extra_inboxes(self) -> List[str]:
|
||||
return EXTRA_INBOXES
|
||||
|
||||
def save(self, box: Box, activity: ap.BaseActivity) -> None:
|
||||
def save(box: Box, activity: ap.BaseActivity) -> None:
|
||||
"""Custom helper for saving an activity to the DB."""
|
||||
visibility = ap.get_visibility(activity)
|
||||
is_public = False
|
||||
|
@ -153,6 +107,84 @@ class MicroblogPubBackend(Backend):
|
|||
}
|
||||
)
|
||||
|
||||
|
||||
def outbox_is_blocked(actor_id: str) -> bool:
|
||||
return bool(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ap.ActivityType.BLOCK.value,
|
||||
"activity.object": actor_id,
|
||||
"meta.undo": False,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def activity_url(item_id: str) -> str:
|
||||
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
|
||||
|
||||
|
||||
def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||
# Check for Block activity
|
||||
actor = activity.get_actor()
|
||||
if outbox_is_blocked(actor.id):
|
||||
logger.info(
|
||||
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
|
||||
)
|
||||
return
|
||||
|
||||
if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}):
|
||||
# The activity is already in the inbox
|
||||
logger.info(f"received duplicate activity {activity!r}, dropping it")
|
||||
return
|
||||
|
||||
save(Box.INBOX, activity)
|
||||
Tasks.process_new_activity(activity.id)
|
||||
|
||||
logger.info(f"spawning task for {activity!r}")
|
||||
Tasks.finish_post_to_inbox(activity.id)
|
||||
|
||||
|
||||
def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||
if activity.has_type(ap.CREATE_TYPES):
|
||||
activity = activity.build_create()
|
||||
|
||||
# Assign create a random ID
|
||||
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
|
||||
uri = activity_url(obj_id)
|
||||
activity._data["id"] = uri
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
activity._data["object"]["id"] = urljoin(
|
||||
BASE_URL, url_for("outbox_activity", item_id=obj_id)
|
||||
)
|
||||
activity._data["object"]["url"] = urljoin(
|
||||
BASE_URL, url_for("note_by_id", note_id=obj_id)
|
||||
)
|
||||
activity.reset_object_cache()
|
||||
|
||||
save(Box.OUTBOX, activity)
|
||||
Tasks.cache_actor(activity.id)
|
||||
Tasks.finish_post_to_outbox(activity.id)
|
||||
return activity.id
|
||||
|
||||
|
||||
class MicroblogPubBackend(Backend):
|
||||
"""Implements a Little Boxes backend, backed by MongoDB."""
|
||||
|
||||
def base_url(self) -> str:
|
||||
return BASE_URL
|
||||
|
||||
def debug_mode(self) -> bool:
|
||||
return strtobool(os.getenv("MICROBLOGPUB_DEBUG", "false"))
|
||||
|
||||
def user_agent(self) -> str:
|
||||
"""Setup a custom user agent."""
|
||||
return USER_AGENT
|
||||
|
||||
def extra_inboxes(self) -> List[str]:
|
||||
return EXTRA_INBOXES
|
||||
|
||||
def followers(self) -> List[str]:
|
||||
q = {
|
||||
"box": Box.INBOX.value,
|
||||
|
@ -195,19 +227,6 @@ class MicroblogPubBackend(Backend):
|
|||
|
||||
return super().parse_collection(payload, url)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool:
|
||||
return bool(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": ap.ActivityType.BLOCK.value,
|
||||
"activity.object": actor_id,
|
||||
"meta.undo": False,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901
|
||||
# Shortcut if the instance actor is fetched
|
||||
if iri == ME["id"]:
|
||||
|
@ -317,259 +336,9 @@ class MicroblogPubBackend(Backend):
|
|||
|
||||
return data
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool:
|
||||
return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri}))
|
||||
|
||||
def set_post_to_remote_inbox(self, cb):
|
||||
self.post_to_remote_inbox_cb = cb
|
||||
|
||||
@ensure_it_is_me
|
||||
def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
|
||||
DB.activities.update_one(
|
||||
{"remote_id": follow.id}, {"$set": {"meta.undo": True}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def undo_new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None:
|
||||
DB.activities.update_one(
|
||||
{"remote_id": follow.id}, {"$set": {"meta.undo": True}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.activities.update_one(
|
||||
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": 1}},
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.activities.update_one(
|
||||
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": -1}},
|
||||
)
|
||||
DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}})
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
if obj.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(obj)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}},
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None:
|
||||
obj = like.get_object()
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}},
|
||||
)
|
||||
DB.activities.update_one({"remote_id": like.id}, {"$set": {"meta.undo": True}})
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
# TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else
|
||||
# or remove it?
|
||||
try:
|
||||
obj = announce.get_object()
|
||||
except NotAnActivityError:
|
||||
logger.exception(
|
||||
f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message'
|
||||
)
|
||||
return
|
||||
|
||||
if obj.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(obj)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
"meta.object_actor": _actor_to_meta(obj.get_actor()),
|
||||
}
|
||||
},
|
||||
)
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
obj = announce.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}}
|
||||
)
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id}, {"$set": {"meta.undo": True}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
obj = announce.get_object()
|
||||
if obj.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(obj)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
"meta.object_actor": _actor_to_meta(obj.get_actor()),
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None:
|
||||
obj = announce.get_object()
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}}
|
||||
)
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id}, {"$set": {"meta.undo": True}}
|
||||
)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
|
||||
obj_id = delete.get_object_id()
|
||||
logger.debug("delete object={obj_id}")
|
||||
try:
|
||||
obj = ap.fetch_remote_activity(obj_id)
|
||||
logger.info(f"inbox_delete handle_replies obj={obj!r}")
|
||||
in_reply_to = obj.get_in_reply_to() if obj.inReplyTo else None
|
||||
if obj.has_type(ap.CREATE_TYPES):
|
||||
in_reply_to = ap._get_id(
|
||||
DB.activities.find_one(
|
||||
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
|
||||
)["activity"]["object"].get("inReplyTo")
|
||||
)
|
||||
if in_reply_to:
|
||||
self._handle_replies_delete(as_actor, in_reply_to)
|
||||
except Exception:
|
||||
logger.exception(f"failed to handle delete replies for {obj_id}")
|
||||
|
||||
DB.activities.update_one(
|
||||
{"meta.object_id": obj_id, "type": "Create"},
|
||||
{"$set": {"meta.deleted": True}},
|
||||
)
|
||||
|
||||
# Foce undo other related activities
|
||||
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}})
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None:
|
||||
DB.activities.update(
|
||||
{"meta.object_id": delete.get_object_id()},
|
||||
{"$set": {"meta.deleted": True, "meta.undo": True}},
|
||||
)
|
||||
obj = delete.get_object()
|
||||
if delete.get_object().ACTIVITY_TYPE != ap.ActivityType.NOTE:
|
||||
obj = ap.parse_activity(
|
||||
DB.activities.find_one(
|
||||
{
|
||||
"activity.object.id": delete.get_object().id,
|
||||
"type": ap.ActivityType.CREATE.value,
|
||||
}
|
||||
)["activity"]
|
||||
).get_object()
|
||||
|
||||
self._handle_replies_delete(as_actor, obj.get_in_reply_to())
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None:
|
||||
obj = update.get_object()
|
||||
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$set": {"activity.object": obj.to_dict()}},
|
||||
)
|
||||
elif obj.has_type(ap.ActivityType.QUESTION):
|
||||
choices = obj._data.get("oneOf", obj.anyOf)
|
||||
total_replies = 0
|
||||
_set = {}
|
||||
for choice in choices:
|
||||
answer_key = _answer_key(choice["name"])
|
||||
cnt = choice["replies"]["totalItems"]
|
||||
total_replies += cnt
|
||||
_set[f"meta.question_answers.{answer_key}"] = cnt
|
||||
|
||||
_set["meta.question_replies"] = total_replies
|
||||
|
||||
DB.activities.update_one(
|
||||
{"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set}
|
||||
)
|
||||
# Also update the cached copies of the question (like Announce and Like)
|
||||
DB.activities.update_many(
|
||||
{"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}}
|
||||
)
|
||||
|
||||
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_update(self, as_actor: ap.Person, _update: ap.Update) -> None:
|
||||
obj = _update._data["object"]
|
||||
|
||||
update_prefix = "activity.object."
|
||||
update: Dict[str, Any] = {"$set": dict(), "$unset": dict()}
|
||||
update["$set"][f"{update_prefix}updated"] = (
|
||||
datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
||||
)
|
||||
for k, v in obj.items():
|
||||
if k in ["id", "type"]:
|
||||
continue
|
||||
if v is None:
|
||||
update["$unset"][f"{update_prefix}{k}"] = ""
|
||||
else:
|
||||
update["$set"][f"{update_prefix}{k}"] = v
|
||||
|
||||
if len(update["$unset"]) == 0:
|
||||
del update["$unset"]
|
||||
|
||||
print(f"updating note from outbox {obj!r} {update}")
|
||||
logger.info(f"updating note from outbox {obj!r} {update}")
|
||||
DB.activities.update_one({"activity.object.id": obj["id"]}, update)
|
||||
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
|
||||
# (create a new Update with the result of the update, and send it without saving it?)
|
||||
|
||||
@ensure_it_is_me
|
||||
def outbox_create(self, as_actor: ap.Person, create: ap.Create) -> None:
|
||||
obj = create.get_object()
|
||||
|
||||
# Flag the activity as a poll answer if needed
|
||||
print(f"POLL ANSWER ChECK {obj.get_in_reply_to()} {obj.name} {obj.content}")
|
||||
if obj.get_in_reply_to() and obj.name and not obj.content:
|
||||
DB.activities.update_one(
|
||||
{"remote_id": create.id}, {"$set": {"meta.poll_answer": True}}
|
||||
)
|
||||
|
||||
self._handle_replies(as_actor, create)
|
||||
|
||||
@ensure_it_is_me
|
||||
def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None:
|
||||
# If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the
|
||||
# local copy)
|
||||
question = create.get_object()
|
||||
if question.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(question)
|
||||
|
||||
self._handle_replies(as_actor, create)
|
||||
|
||||
@ensure_it_is_me
|
||||
def _handle_replies_delete(
|
||||
self, as_actor: ap.Person, in_reply_to: Optional[str]
|
||||
) -> None:
|
||||
|
@ -627,7 +396,6 @@ class MicroblogPubBackend(Backend):
|
|||
|
||||
return None
|
||||
|
||||
@ensure_it_is_me
|
||||
def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None:
|
||||
"""Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent"
|
||||
key to make it easy to query a whole thread."""
|
||||
|
@ -666,7 +434,7 @@ class MicroblogPubBackend(Backend):
|
|||
)
|
||||
if not creply:
|
||||
# It means the activity is not in the inbox, and not in the outbox, we want to save it
|
||||
self.save(Box.REPLIES, reply)
|
||||
save(Box.REPLIES, reply)
|
||||
new_threads.append(reply.id)
|
||||
# TODO(tsileo): parses the replies collection and import the replies?
|
||||
|
||||
|
@ -678,7 +446,7 @@ class MicroblogPubBackend(Backend):
|
|||
reply = ap.fetch_remote_activity(root_reply)
|
||||
q = {"activity.object.id": root_reply}
|
||||
if not DB.activities.count(q):
|
||||
self.save(Box.REPLIES, reply)
|
||||
save(Box.REPLIES, reply)
|
||||
new_threads.append(reply.id)
|
||||
|
||||
DB.activities.update_one(
|
||||
|
@ -690,118 +458,6 @@ class MicroblogPubBackend(Backend):
|
|||
)
|
||||
|
||||
|
||||
def gen_feed():
|
||||
fg = FeedGenerator()
|
||||
fg.id(f"{ID}")
|
||||
fg.title(f"{USERNAME} notes")
|
||||
fg.author({"name": USERNAME, "email": "t@a4.io"})
|
||||
fg.link(href=ID, rel="alternate")
|
||||
fg.description(f"{USERNAME} notes")
|
||||
fg.logo(ME.get("icon", {}).get("url"))
|
||||
fg.language("en")
|
||||
for item in DB.activities.find(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"meta.public": True,
|
||||
},
|
||||
limit=10,
|
||||
).sort("_id", -1):
|
||||
fe = fg.add_entry()
|
||||
fe.id(item["activity"]["object"].get("url"))
|
||||
fe.link(href=item["activity"]["object"].get("url"))
|
||||
fe.title(item["activity"]["object"]["content"])
|
||||
fe.description(item["activity"]["object"]["content"])
|
||||
return fg
|
||||
|
||||
|
||||
def json_feed(path: str) -> Dict[str, Any]:
|
||||
"""JSON Feed (https://jsonfeed.org/) document."""
|
||||
data = []
|
||||
for item in DB.activities.find(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"meta.public": True,
|
||||
},
|
||||
limit=10,
|
||||
).sort("_id", -1):
|
||||
data.append(
|
||||
{
|
||||
"id": item["activity"]["id"],
|
||||
"url": item["activity"]["object"].get("url"),
|
||||
"content_html": item["activity"]["object"]["content"],
|
||||
"content_text": html2text(item["activity"]["object"]["content"]),
|
||||
"date_published": item["activity"]["object"].get("published"),
|
||||
}
|
||||
)
|
||||
return {
|
||||
"version": "https://jsonfeed.org/version/1",
|
||||
"user_comment": (
|
||||
"This is a microblog feed. You can add this to your feed reader using the following URL: "
|
||||
+ ID
|
||||
+ path
|
||||
),
|
||||
"title": USERNAME,
|
||||
"home_page_url": ID,
|
||||
"feed_url": ID + path,
|
||||
"author": {
|
||||
"name": USERNAME,
|
||||
"url": ID,
|
||||
"avatar": ME.get("icon", {}).get("url"),
|
||||
},
|
||||
"items": data,
|
||||
}
|
||||
|
||||
|
||||
def build_inbox_json_feed(
|
||||
path: str, request_cursor: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Build a JSON feed from the inbox activities."""
|
||||
data = []
|
||||
cursor = None
|
||||
|
||||
q: Dict[str, Any] = {
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"box": Box.INBOX.value,
|
||||
}
|
||||
if request_cursor:
|
||||
q["_id"] = {"$lt": request_cursor}
|
||||
|
||||
for item in DB.activities.find(q, limit=50).sort("_id", -1):
|
||||
actor = ap.get_backend().fetch_iri(item["activity"]["actor"])
|
||||
data.append(
|
||||
{
|
||||
"id": item["activity"]["id"],
|
||||
"url": item["activity"]["object"].get("url"),
|
||||
"content_html": item["activity"]["object"]["content"],
|
||||
"content_text": html2text(item["activity"]["object"]["content"]),
|
||||
"date_published": item["activity"]["object"].get("published"),
|
||||
"author": {
|
||||
"name": actor.get("name", actor.get("preferredUsername")),
|
||||
"url": actor.get("url"),
|
||||
"avatar": actor.get("icon", {}).get("url"),
|
||||
},
|
||||
}
|
||||
)
|
||||
cursor = str(item["_id"])
|
||||
|
||||
resp = {
|
||||
"version": "https://jsonfeed.org/version/1",
|
||||
"title": f"{USERNAME}'s stream",
|
||||
"home_page_url": ID,
|
||||
"feed_url": ID + path,
|
||||
"items": data,
|
||||
}
|
||||
if cursor and len(data) == 50:
|
||||
resp["next_url"] = ID + path + "?cursor=" + cursor
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def embed_collection(total_items, first_page_id):
|
||||
"""Helper creating a root OrderedCollection with a link to the first page."""
|
||||
return {
|
||||
|
@ -905,3 +561,60 @@ def build_ordered_collection(
|
|||
# XXX(tsileo): implements prev with prev=<first item cursor>?
|
||||
|
||||
return resp
|
||||
|
||||
|
||||
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
|
||||
activity = raw_doc["activity"]
|
||||
if (
|
||||
ap._has_type(activity["type"], ap.ActivityType.CREATE)
|
||||
and "object" in activity
|
||||
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
|
||||
):
|
||||
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
|
||||
choice["replies"] = {
|
||||
"type": ap.ActivityType.COLLECTION.value,
|
||||
"totalItems": raw_doc["meta"]
|
||||
.get("question_answers", {})
|
||||
.get(_answer_key(choice["name"]), 0),
|
||||
}
|
||||
now = datetime.now(timezone.utc)
|
||||
if format_datetime(now) >= activity["object"]["endTime"]:
|
||||
activity["object"]["closed"] = activity["object"]["endTime"]
|
||||
|
||||
|
||||
def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if raw_doc["activity"]["type"] != ap.ActivityType.CREATE.value:
|
||||
return raw_doc
|
||||
|
||||
raw_doc["activity"]["object"]["replies"] = embed_collection(
|
||||
raw_doc.get("meta", {}).get("count_direct_reply", 0),
|
||||
f'{raw_doc["remote_id"]}/replies',
|
||||
)
|
||||
|
||||
raw_doc["activity"]["object"]["likes"] = embed_collection(
|
||||
raw_doc.get("meta", {}).get("count_like", 0), f'{raw_doc["remote_id"]}/likes'
|
||||
)
|
||||
|
||||
raw_doc["activity"]["object"]["shares"] = embed_collection(
|
||||
raw_doc.get("meta", {}).get("count_boost", 0), f'{raw_doc["remote_id"]}/shares'
|
||||
)
|
||||
|
||||
return raw_doc
|
||||
|
||||
|
||||
def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if "@context" in activity:
|
||||
del activity["@context"]
|
||||
return activity
|
||||
|
||||
|
||||
def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]:
|
||||
raw_doc = add_extra_collection(raw_doc)
|
||||
activity = clean_activity(raw_doc["activity"])
|
||||
|
||||
# Handle Questions
|
||||
# TODO(tsileo): what about object embedded by ID/URL?
|
||||
_add_answers_to_question(raw_doc)
|
||||
if embed:
|
||||
return remove_context(activity)
|
||||
return activity
|
||||
|
|
125
core/feed.py
Normal file
125
core/feed.py
Normal file
|
@ -0,0 +1,125 @@
|
|||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
|
||||
from feedgen.feed import FeedGenerator
|
||||
from html2text import html2text
|
||||
from little_boxes import activitypub as ap
|
||||
|
||||
from config import ID
|
||||
from config import ME
|
||||
from config import USERNAME
|
||||
from core.db import DB
|
||||
from core.meta import Box
|
||||
|
||||
|
||||
def gen_feed():
|
||||
fg = FeedGenerator()
|
||||
fg.id(f"{ID}")
|
||||
fg.title(f"{USERNAME} notes")
|
||||
fg.author({"name": USERNAME, "email": "t@a4.io"})
|
||||
fg.link(href=ID, rel="alternate")
|
||||
fg.description(f"{USERNAME} notes")
|
||||
fg.logo(ME.get("icon", {}).get("url"))
|
||||
fg.language("en")
|
||||
for item in DB.activities.find(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"meta.public": True,
|
||||
},
|
||||
limit=10,
|
||||
).sort("_id", -1):
|
||||
fe = fg.add_entry()
|
||||
fe.id(item["activity"]["object"].get("url"))
|
||||
fe.link(href=item["activity"]["object"].get("url"))
|
||||
fe.title(item["activity"]["object"]["content"])
|
||||
fe.description(item["activity"]["object"]["content"])
|
||||
return fg
|
||||
|
||||
|
||||
def json_feed(path: str) -> Dict[str, Any]:
|
||||
"""JSON Feed (https://jsonfeed.org/) document."""
|
||||
data = []
|
||||
for item in DB.activities.find(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"meta.public": True,
|
||||
},
|
||||
limit=10,
|
||||
).sort("_id", -1):
|
||||
data.append(
|
||||
{
|
||||
"id": item["activity"]["id"],
|
||||
"url": item["activity"]["object"].get("url"),
|
||||
"content_html": item["activity"]["object"]["content"],
|
||||
"content_text": html2text(item["activity"]["object"]["content"]),
|
||||
"date_published": item["activity"]["object"].get("published"),
|
||||
}
|
||||
)
|
||||
return {
|
||||
"version": "https://jsonfeed.org/version/1",
|
||||
"user_comment": (
|
||||
"This is a microblog feed. You can add this to your feed reader using the following URL: "
|
||||
+ ID
|
||||
+ path
|
||||
),
|
||||
"title": USERNAME,
|
||||
"home_page_url": ID,
|
||||
"feed_url": ID + path,
|
||||
"author": {
|
||||
"name": USERNAME,
|
||||
"url": ID,
|
||||
"avatar": ME.get("icon", {}).get("url"),
|
||||
},
|
||||
"items": data,
|
||||
}
|
||||
|
||||
|
||||
def build_inbox_json_feed(
|
||||
path: str, request_cursor: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Build a JSON feed from the inbox activities."""
|
||||
data = []
|
||||
cursor = None
|
||||
|
||||
q: Dict[str, Any] = {
|
||||
"type": "Create",
|
||||
"meta.deleted": False,
|
||||
"box": Box.INBOX.value,
|
||||
}
|
||||
if request_cursor:
|
||||
q["_id"] = {"$lt": request_cursor}
|
||||
|
||||
for item in DB.activities.find(q, limit=50).sort("_id", -1):
|
||||
actor = ap.get_backend().fetch_iri(item["activity"]["actor"])
|
||||
data.append(
|
||||
{
|
||||
"id": item["activity"]["id"],
|
||||
"url": item["activity"]["object"].get("url"),
|
||||
"content_html": item["activity"]["object"]["content"],
|
||||
"content_text": html2text(item["activity"]["object"]["content"]),
|
||||
"date_published": item["activity"]["object"].get("published"),
|
||||
"author": {
|
||||
"name": actor.get("name", actor.get("preferredUsername")),
|
||||
"url": actor.get("url"),
|
||||
"avatar": actor.get("icon", {}).get("url"),
|
||||
},
|
||||
}
|
||||
)
|
||||
cursor = str(item["_id"])
|
||||
|
||||
resp = {
|
||||
"version": "https://jsonfeed.org/version/1",
|
||||
"title": f"{USERNAME}'s stream",
|
||||
"home_page_url": ID,
|
||||
"feed_url": ID + path,
|
||||
"items": data,
|
||||
}
|
||||
if cursor and len(data) == 50:
|
||||
resp["next_url"] = ID + path + "?cursor=" + cursor
|
||||
|
||||
return resp
|
|
@ -8,11 +8,18 @@ from little_boxes.errors import NotAnActivityError
|
|||
|
||||
import config
|
||||
from core.activitypub import _answer_key
|
||||
from core.activitypub import post_to_outbox
|
||||
from core.db import DB
|
||||
from core.meta import Box
|
||||
from core.db import update_one_activity
|
||||
from core.meta import MetaKey
|
||||
from core.meta import by_object_id
|
||||
from core.meta import by_remote_id
|
||||
from core.meta import by_type
|
||||
from core.meta import in_inbox
|
||||
from core.meta import inc
|
||||
from core.meta import upsert
|
||||
from core.shared import MY_PERSON
|
||||
from core.shared import back
|
||||
from core.shared import post_to_outbox
|
||||
from core.tasks import Tasks
|
||||
from utils import now
|
||||
|
||||
|
@ -47,12 +54,13 @@ def _delete_process_inbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
|
|||
except Exception:
|
||||
_logger.exception(f"failed to handle delete replies for {obj_id}")
|
||||
|
||||
DB.activities.update_one(
|
||||
{"meta.object_id": obj_id, "type": "Create"}, {"$set": {"meta.deleted": True}}
|
||||
update_one_activity(
|
||||
{**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)},
|
||||
upsert({MetaKey.DELETED: True}),
|
||||
)
|
||||
|
||||
# Foce undo other related activities
|
||||
DB.activities.update({"meta.object_id": obj_id}, {"$set": {"meta.undo": True}})
|
||||
DB.activities.update(by_object_id(obj_id), upsert({MetaKey.UNDO: True}))
|
||||
|
||||
|
||||
@process_inbox.register
|
||||
|
@ -60,7 +68,7 @@ def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
|
|||
_logger.info(f"process_inbox activity={update!r}")
|
||||
obj = update.get_object()
|
||||
if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE:
|
||||
DB.activities.update_one(
|
||||
update_one_activity(
|
||||
{"activity.object.id": obj.id}, {"$set": {"activity.object": obj.to_dict()}}
|
||||
)
|
||||
elif obj.has_type(ap.ActivityType.QUESTION):
|
||||
|
@ -75,12 +83,10 @@ def _update_process_inbox(update: ap.Update, new_meta: _NewMeta) -> None:
|
|||
|
||||
_set["meta.question_replies"] = total_replies
|
||||
|
||||
DB.activities.update_one(
|
||||
{"box": Box.INBOX.value, "activity.object.id": obj.id}, {"$set": _set}
|
||||
)
|
||||
update_one_activity({**in_inbox(), **by_object_id(obj.id)}, {"$set": _set})
|
||||
# Also update the cached copies of the question (like Announce and Like)
|
||||
DB.activities.update_many(
|
||||
{"meta.object.id": obj.id}, {"$set": {"meta.object": obj.to_dict()}}
|
||||
by_object_id(obj.id), upsert({MetaKey.OBJECT: obj.to_dict()})
|
||||
)
|
||||
|
||||
# FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor
|
||||
|
@ -114,17 +120,18 @@ def _announce_process_inbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
|
|||
if obj.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(obj)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id},
|
||||
update_one_activity(
|
||||
by_remote_id(announce.id),
|
||||
upsert(
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
"meta.object_actor": obj.get_actor().to_dict(embed=True),
|
||||
MetaKey.OBJECT: obj.to_dict(embed=True),
|
||||
MetaKey.OBJECT_ACTOR: obj.get_actor().to_dict(embed=True),
|
||||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": 1}}
|
||||
update_one_activity(
|
||||
{**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)},
|
||||
inc(MetaKey.COUNT_BOOST, 1),
|
||||
)
|
||||
|
||||
|
||||
|
@ -133,9 +140,9 @@ def _like_process_inbox(like: ap.Like, new_meta: _NewMeta) -> None:
|
|||
_logger.info(f"process_inbox activity={like!r}")
|
||||
obj = like.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.activities.update_one(
|
||||
{"box": Box.OUTBOX.value, "activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": 1}},
|
||||
update_one_activity(
|
||||
{**by_type(ap.ActivityType.CREATE), **by_object_id(obj.id)},
|
||||
inc(MetaKey.COUNT_LIKE, 1),
|
||||
)
|
||||
|
||||
|
||||
|
@ -161,21 +168,23 @@ def _follow_process_inbox(activity: ap.Follow, new_meta: _NewMeta) -> None:
|
|||
@process_inbox.register
|
||||
def _undo_process_inbox(activity: ap.Undo, new_meta: _NewMeta) -> None:
|
||||
_logger.info(f"process_inbox activity={activity!r}")
|
||||
# Fetch the object that's been undo'ed
|
||||
obj = activity.get_object()
|
||||
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
||||
|
||||
# Set the undo flag on the mentionned activity
|
||||
update_one_activity(by_remote_id(obj.id), upsert({MetaKey.UNDO: True}))
|
||||
|
||||
# Handle cached counters
|
||||
if obj.has_type(ap.ActivityType.LIKE):
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.activities.update_one(
|
||||
{
|
||||
"box": Box.OUTBOX.value,
|
||||
"meta.object_id": obj.get_object_id(),
|
||||
"type": ap.ActivityType.CREATE.value,
|
||||
},
|
||||
{"$inc": {"meta.count_like": -1}},
|
||||
update_one_activity(
|
||||
{**by_object_id(obj.get_object_id()), **by_type(ap.ActivityType.CREATE)},
|
||||
inc(MetaKey.COUNT_LIKE, -1),
|
||||
)
|
||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||
announced = obj.get_object()
|
||||
# Update the meta counter if the object is published by the server
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": announced.id}, {"$inc": {"meta.count_boost": -1}}
|
||||
update_one_activity(
|
||||
{**by_type(ap.ActivityType.CREATE), **by_object_id(announced.id)},
|
||||
inc(MetaKey.COUNT_BOOST, -1),
|
||||
)
|
||||
|
|
|
@ -9,13 +9,16 @@ def create_indexes():
|
|||
if "trash" not in DB.collection_names():
|
||||
DB.create_collection("trash", capped=True, size=50 << 20) # 50 MB
|
||||
|
||||
if "activities" in DB.collection_names():
|
||||
DB.command("compact", "activities")
|
||||
|
||||
DB.activities.create_index([(_meta(MetaKey.NOTIFICATION), pymongo.ASCENDING)])
|
||||
DB.activities.create_index(
|
||||
[(_meta(MetaKey.NOTIFICATION_UNREAD), pymongo.ASCENDING)]
|
||||
)
|
||||
DB.activities.create_index([("remote_id", pymongo.ASCENDING)])
|
||||
DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)])
|
||||
DB.activities.create_index([("meta.actor_id", pymongo.ASCENDING)])
|
||||
DB.activities.create_index([("meta.object_id", pymongo.ASCENDING)])
|
||||
DB.activities.create_index([("meta.thread_root_parent", pymongo.ASCENDING)])
|
||||
DB.activities.create_index(
|
||||
[
|
||||
|
@ -26,14 +29,9 @@ def create_indexes():
|
|||
DB.activities.create_index(
|
||||
[("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)]
|
||||
)
|
||||
DB.cache2.create_index(
|
||||
[
|
||||
("path", pymongo.ASCENDING),
|
||||
("type", pymongo.ASCENDING),
|
||||
("arg", pymongo.ASCENDING),
|
||||
]
|
||||
DB.activities.create_index(
|
||||
[("meta.object_id", pymongo.ASCENDING), ("type", pymongo.ASCENDING)]
|
||||
)
|
||||
DB.cache2.create_index("date", expireAfterSeconds=3600 * 12)
|
||||
|
||||
# Index for the block query
|
||||
DB.activities.create_index(
|
||||
|
|
25
core/meta.py
25
core/meta.py
|
@ -27,9 +27,17 @@ class MetaKey(Enum):
|
|||
PUBLISHED = "published"
|
||||
GC_KEEP = "gc_keep"
|
||||
OBJECT = "object"
|
||||
OBJECT_ID = "object_id"
|
||||
OBJECT_ACTOR = "object_actor"
|
||||
PUBLIC = "public"
|
||||
|
||||
DELETED = "deleted"
|
||||
BOOSTED = "boosted"
|
||||
LIKED = "liked"
|
||||
|
||||
COUNT_LIKE = "count_like"
|
||||
COUNT_BOOST = "count_boost"
|
||||
|
||||
|
||||
def _meta(mk: MetaKey) -> str:
|
||||
return f"meta.{mk.value}"
|
||||
|
@ -59,5 +67,22 @@ def by_actor(actor: ap.BaseActivity) -> _SubQuery:
|
|||
return {_meta(MetaKey.ACTOR_ID): actor.id}
|
||||
|
||||
|
||||
def by_object_id(object_id: str) -> _SubQuery:
|
||||
return {_meta(MetaKey.OBJECT_ID): object_id}
|
||||
|
||||
|
||||
def is_public() -> _SubQuery:
|
||||
return {_meta(MetaKey.PUBLIC): True}
|
||||
|
||||
|
||||
def inc(mk: MetaKey, val: int) -> _SubQuery:
|
||||
return {"$inc": {_meta(mk): val}}
|
||||
|
||||
|
||||
def upsert(data: Dict[MetaKey, Any]) -> _SubQuery:
|
||||
sq: Dict[str, Any] = {}
|
||||
|
||||
for mk, val in data.items():
|
||||
sq[_meta(mk)] = val
|
||||
|
||||
return {"$set": sq}
|
||||
|
|
|
@ -6,9 +6,15 @@ from typing import Dict
|
|||
|
||||
from little_boxes import activitypub as ap
|
||||
|
||||
from core.db import DB
|
||||
from core.db import find_one_activity
|
||||
from core.db import update_many_activities
|
||||
from core.db import update_one_activity
|
||||
from core.meta import MetaKey
|
||||
from core.meta import by_object_id
|
||||
from core.meta import by_remote_id
|
||||
from core.meta import by_type
|
||||
from core.meta import inc
|
||||
from core.meta import upsert
|
||||
from core.shared import MY_PERSON
|
||||
from core.shared import back
|
||||
from core.tasks import Tasks
|
||||
|
@ -31,13 +37,13 @@ def _delete_process_outbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
|
|||
|
||||
# Flag everything referencing the deleted object as deleted (except the Delete activity itself)
|
||||
update_many_activities(
|
||||
{"meta.object_id": obj_id, "remote_id": {"$ne": delete.id}},
|
||||
{"$set": {"meta.deleted": True, "meta.undo": True}},
|
||||
{**by_object_id(obj_id), "remote_id": {"$ne": delete.id}},
|
||||
upsert({MetaKey.DELETED: True, MetaKey.UNDO: True}),
|
||||
)
|
||||
|
||||
# If the deleted activity was in DB, decrease some threads-related counter
|
||||
data = find_one_activity(
|
||||
{"meta.object_id": obj_id, "type": ap.ActivityType.CREATE.value}
|
||||
{**by_object_id(obj_id), **by_type(ap.ActivityType.CREATE)}
|
||||
)
|
||||
_logger.info(f"found local copy of deleted activity: {data}")
|
||||
if data:
|
||||
|
@ -45,8 +51,8 @@ def _delete_process_outbox(delete: ap.Delete, new_meta: _NewMeta) -> None:
|
|||
_logger.info(f"obj={obj!r}")
|
||||
in_reply_to = obj.get_in_reply_to()
|
||||
if in_reply_to:
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": in_reply_to},
|
||||
update_one_activity(
|
||||
{**by_type(ap.ActivityType.CREATE), **by_object_id(in_reply_to)},
|
||||
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
|
||||
)
|
||||
|
||||
|
@ -74,7 +80,7 @@ def _update_process_outbox(update: ap.Update, new_meta: _NewMeta) -> None:
|
|||
del to_update["$unset"]
|
||||
|
||||
_logger.info(f"updating note from outbox {obj!r} {to_update}")
|
||||
DB.activities.update_one({"activity.object.id": obj["id"]}, to_update)
|
||||
update_one_activity({"activity.object.id": obj["id"]}, to_update)
|
||||
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
|
||||
# (create a new Update with the result of the update, and send it without saving it?)
|
||||
|
||||
|
@ -93,18 +99,19 @@ def _announce_process_outbox(announce: ap.Announce, new_meta: _NewMeta) -> None:
|
|||
if obj.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(obj)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"remote_id": announce.id},
|
||||
update_one_activity(
|
||||
by_remote_id(announce.id),
|
||||
upsert(
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
"meta.object_actor": obj.get_actor().to_dict(embed=True),
|
||||
MetaKey.OBJECT: obj.to_dict(embed=True),
|
||||
MetaKey.OBJECT_ACTOR: obj.get_actor().to_dict(embed=True),
|
||||
}
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}}
|
||||
update_one_activity(
|
||||
{**by_object_id(obj.id), **by_type(ap.ActivityType.CREATE)},
|
||||
upsert({MetaKey.BOOSTED: announce.id}),
|
||||
)
|
||||
|
||||
|
||||
|
@ -116,9 +123,9 @@ def _like_process_outbox(like: ap.Like, new_meta: _NewMeta) -> None:
|
|||
if obj.has_type(ap.ActivityType.QUESTION):
|
||||
Tasks.fetch_remote_question(obj)
|
||||
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": obj.id},
|
||||
{"$inc": {"meta.count_like": 1}, "$set": {"meta.liked": like.id}},
|
||||
update_one_activity(
|
||||
{**by_object_id(obj.id), **by_type(ap.ActivityType.CREATE)},
|
||||
{**inc(MetaKey.COUNT_LIKE, 1), **upsert({MetaKey.LIKED: like.id})},
|
||||
)
|
||||
|
||||
|
||||
|
@ -126,20 +133,21 @@ def _like_process_outbox(like: ap.Like, new_meta: _NewMeta) -> None:
|
|||
def _undo_process_outbox(undo: ap.Undo, new_meta: _NewMeta) -> None:
|
||||
_logger.info(f"process_outbox activity={undo!r}")
|
||||
obj = undo.get_object()
|
||||
DB.activities.update_one({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
||||
update_one_activity({"remote_id": obj.id}, {"$set": {"meta.undo": True}})
|
||||
|
||||
# Undo Like
|
||||
if obj.has_type(ap.ActivityType.LIKE):
|
||||
liked = obj.get_object_id()
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": liked},
|
||||
{"$inc": {"meta.count_like": -1}, "$set": {"meta.liked": False}},
|
||||
update_one_activity(
|
||||
{**by_object_id(liked), **by_type(ap.ActivityType.CREATE)},
|
||||
{**inc(MetaKey.COUNT_LIKE, -1), **upsert({MetaKey.LIKED: False})},
|
||||
)
|
||||
|
||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||
announced = obj.get_object_id()
|
||||
DB.activities.update_one(
|
||||
{"activity.object.id": announced}, {"$set": {"meta.boosted": False}}
|
||||
update_one_activity(
|
||||
{**by_object_id(announced), **by_type(ap.ActivityType.CREATE)},
|
||||
upsert({MetaKey.BOOSTED: False}),
|
||||
)
|
||||
|
||||
# Undo Follow (undo new following)
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
import binascii
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
from functools import wraps
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import flask
|
||||
from bson.objectid import ObjectId
|
||||
from flask import Response
|
||||
from flask import current_app as app
|
||||
from flask import redirect
|
||||
from flask import request
|
||||
|
@ -16,16 +13,12 @@ from flask import session
|
|||
from flask import url_for
|
||||
from flask_wtf.csrf import CSRFProtect
|
||||
from little_boxes import activitypub as ap
|
||||
from little_boxes.activitypub import format_datetime
|
||||
from poussetaches import PousseTaches
|
||||
|
||||
from config import BASE_URL
|
||||
import config
|
||||
from config import DB
|
||||
from config import ME
|
||||
from core import activitypub
|
||||
from core.activitypub import _answer_key
|
||||
from core.meta import Box
|
||||
from core.tasks import Tasks
|
||||
|
||||
# _Response = Union[flask.Response, werkzeug.wrappers.Response, str, Any]
|
||||
_Response = Any
|
||||
|
@ -45,6 +38,29 @@ ap.use_backend(back)
|
|||
MY_PERSON = ap.Person(**ME)
|
||||
|
||||
|
||||
def jsonify(**data):
|
||||
if "@context" not in data:
|
||||
data["@context"] = config.DEFAULT_CTX
|
||||
return Response(
|
||||
response=json.dumps(data),
|
||||
headers={
|
||||
"Content-Type": "application/json"
|
||||
if app.debug
|
||||
else "application/activity+json"
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def is_api_request():
|
||||
h = request.headers.get("Accept")
|
||||
if h is None:
|
||||
return False
|
||||
h = h.split(",")[0]
|
||||
if h in config.HEADERS or h == "application/json":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def add_response_headers(headers={}):
|
||||
"""This decorator adds the headers passed in to the response"""
|
||||
|
||||
|
@ -94,33 +110,6 @@ def _get_ip():
|
|||
return ip, geoip
|
||||
|
||||
|
||||
def activity_url(item_id: str) -> str:
|
||||
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
|
||||
|
||||
|
||||
def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||
if activity.has_type(ap.CREATE_TYPES):
|
||||
activity = activity.build_create()
|
||||
|
||||
# Assign create a random ID
|
||||
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
|
||||
uri = activity_url(obj_id)
|
||||
activity._data["id"] = uri
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
activity._data["object"]["id"] = urljoin(
|
||||
BASE_URL, url_for("outbox_activity", item_id=obj_id)
|
||||
)
|
||||
activity._data["object"]["url"] = urljoin(
|
||||
BASE_URL, url_for("note_by_id", note_id=obj_id)
|
||||
)
|
||||
activity.reset_object_cache()
|
||||
|
||||
back.save(Box.OUTBOX, activity)
|
||||
Tasks.cache_actor(activity.id)
|
||||
Tasks.finish_post_to_outbox(activity.id)
|
||||
return activity.id
|
||||
|
||||
|
||||
def _build_thread(data, include_children=True): # noqa: C901
|
||||
data["_requested"] = True
|
||||
app.logger.info(f"_build_thread({data!r})")
|
||||
|
@ -225,22 +214,3 @@ def paginated_query(db, q, limit=25, sort_key="_id"):
|
|||
older_than = str(outbox_data[-1]["_id"])
|
||||
|
||||
return outbox_data, older_than, newer_than
|
||||
|
||||
|
||||
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
|
||||
activity = raw_doc["activity"]
|
||||
if (
|
||||
ap._has_type(activity["type"], ap.ActivityType.CREATE)
|
||||
and "object" in activity
|
||||
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
|
||||
):
|
||||
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
|
||||
choice["replies"] = {
|
||||
"type": ap.ActivityType.COLLECTION.value,
|
||||
"totalItems": raw_doc["meta"]
|
||||
.get("question_answers", {})
|
||||
.get(_answer_key(choice["name"]), 0),
|
||||
}
|
||||
now = datetime.now(timezone.utc)
|
||||
if format_datetime(now) >= activity["object"]["endTime"]:
|
||||
activity["object"]["closed"] = activity["object"]["endTime"]
|
||||
|
|
Loading…
Reference in a new issue