diff --git a/Makefile b/Makefile index daad832..1cd4e26 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,9 @@ reload-dev: # docker build . -t microblogpub:latest docker-compose -f docker-compose-dev.yml up -d --force-recreate +update-poussetaches: + git clone https://github.com/tsileo/poussetaches.git tmp_poussetaches && cd tmp_poussetaches && docker build . -t poussetaches:latest && cd - && rm -rf tmp_poussetaches + update: git pull docker build . -t microblogpub:latest diff --git a/activitypub.py b/activitypub.py index 9185745..20b5f4c 100644 --- a/activitypub.py +++ b/activitypub.py @@ -1,6 +1,5 @@ import logging import os -import json from datetime import datetime from enum import Enum from typing import Any @@ -79,6 +78,52 @@ class Box(Enum): REPLIES = "replies" +def save(box: Box, activity: ap.BaseActivity) -> None: + """Custom helper for saving an activity to the DB.""" + DB.activities.insert_one( + { + "box": box.value, + "activity": activity.to_dict(), + "type": _to_list(activity.type), + "remote_id": activity.id, + "meta": {"undo": False, "deleted": False}, + } + ) + + +def followers() -> List[str]: + q = { + "box": Box.INBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + return [doc["activity"]["actor"] for doc in DB.activities.find(q)] + + +def following() -> List[str]: + q = { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + return [doc["activity"]["object"] for doc in DB.activities.find(q)] + + +def followers_as_recipients() -> List[str]: + q = { + "box": Box.INBOX.value, + "type": ap.ActivityType.FOLLOW.value, + "meta.undo": False, + } + recipients = [] + for doc in DB.activities.find(q): + recipients.append( + doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"] + ) + + return list(set(recipients)) + + class MicroblogPubBackend(Backend): """Implements a Little Boxes backend, backed by MongoDB.""" @@ -104,73 +149,18 @@ class MicroblogPubBackend(Backend): """URL for activity link.""" return f"{BASE_URL}/note/{obj_id}" - def save(self, box: Box, activity: ap.BaseActivity) -> None: - """Custom helper for saving an activity to the DB.""" - DB.activities.insert_one( - { - "box": box.value, - "activity": activity.to_dict(), - "type": _to_list(activity.type), - "remote_id": activity.id, - "meta": {"undo": False, "deleted": False}, - } - ) - - def followers(self) -> List[str]: - q = { - "box": Box.INBOX.value, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } - return [doc["activity"]["actor"] for doc in DB.activities.find(q)] - - def followers_as_recipients(self) -> List[str]: - q = { - "box": Box.INBOX.value, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } - recipients = [] - for doc in DB.activities.find(q): - recipients.append( - doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"] - ) - - return list(set(recipients)) - - def following(self) -> List[str]: - q = { - "box": Box.OUTBOX.value, - "type": ap.ActivityType.FOLLOW.value, - "meta.undo": False, - } - return [doc["activity"]["object"] for doc in DB.activities.find(q)] - def parse_collection( self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None ) -> List[str]: """Resolve/fetch a `Collection`/`OrderedCollection`.""" # Resolve internal collections via MongoDB directly if url == ID + "/followers": - return self.followers() + return followers() elif url == ID + "/following": - return self.following() + return following() return super().parse_collection(payload, url) - @ensure_it_is_me - def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool: - return bool( - DB.activities.find_one( - { - "box": Box.OUTBOX.value, - "type": ap.ActivityType.BLOCK.value, - "activity.object": actor_id, - "meta.undo": False, - } - ) - ) - def _fetch_iri(self, iri: str) -> ap.ObjectType: if iri == ME["id"]: return ME @@ -229,13 +219,6 @@ class MicroblogPubBackend(Backend): return data - @ensure_it_is_me - def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: - return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri})) - - def set_post_to_remote_inbox(self, cb): - self.post_to_remote_inbox_cb = cb - @ensure_it_is_me def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: DB.activities.update_one( @@ -471,7 +454,7 @@ class MicroblogPubBackend(Backend): ) if not creply: # It means the activity is not in the inbox, and not in the outbox, we want to save it - self.save(Box.REPLIES, reply) + save(Box.REPLIES, reply) new_threads.append(reply.id) while reply is not None: @@ -482,7 +465,7 @@ class MicroblogPubBackend(Backend): reply = ap.fetch_remote_activity(root_reply) q = {"activity.object.id": root_reply} if not DB.activities.count(q): - self.save(Box.REPLIES, reply) + save(Box.REPLIES, reply) new_threads.append(reply.id) DB.activities.update_one( @@ -493,25 +476,6 @@ class MicroblogPubBackend(Backend): {"$set": {"meta.thread_root_parent": root_reply}}, ) - def post_to_outbox(self, activity: ap.BaseActivity) -> None: - if activity.has_type(ap.CREATE_TYPES): - activity = activity.build_create() - - self.save(Box.OUTBOX, activity) - - # Assign create a random ID - obj_id = self.random_object_id() - activity.set_id(self.activity_url(obj_id), obj_id) - - recipients = activity.recipients() - logger.info(f"recipients={recipients}") - activity = ap.clean_activity(activity.to_dict()) - - payload = json.dumps(activity) - for recp in recipients: - logger.debug(f"posting to {recp}") - self.post_to_remote_inbox(self.get_actor(), payload, recp) - def gen_feed(): fg = FeedGenerator() diff --git a/app.py b/app.py index 00580bc..a2b9cfc 100644 --- a/app.py +++ b/app.py @@ -66,6 +66,8 @@ import config import tasks # noqa: here just for the migration # FIXME(tsileo): remove me from activitypub import Box from activitypub import embed_collection +from activitypub import save +from activitypub import followers_as_recipients from config import USER_AGENT from config import ADMIN_API_KEY from config import BASE_URL @@ -1643,7 +1645,7 @@ def inbox(): data["object"] ): logger.info(f"received a Delete for an actor {data!r}") - if get_backend().inbox_check_duplicate(MY_PERSON, data["id"]): + if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": data["id"]}): # The activity is already in the inbox logger.info(f"received duplicate activity {data!r}, dropping it") @@ -2295,7 +2297,9 @@ def task_finish_post_to_outbox(): elif obj.has_type(ap.ActivityType.ANNOUNCE): back.outbox_undo_announce(MY_PERSON, obj) elif obj.has_type(ap.ActivityType.FOLLOW): - back.undo_new_following(MY_PERSON, obj) + DB.activities.update_one( + {"remote_id": obj.id}, {"$set": {"meta.undo": True}} + ) app.logger.info(f"recipients={recipients}") activity = ap.clean_activity(activity.to_dict()) @@ -2345,7 +2349,9 @@ def task_finish_post_to_inbox(): elif obj.has_type(ap.ActivityType.ANNOUNCE): back.inbox_undo_announce(MY_PERSON, obj) elif obj.has_type(ap.ActivityType.FOLLOW): - back.undo_new_follower(MY_PERSON, obj) + DB.activities.update_one( + {"remote_id": obj.id}, {"$set": {"meta.undo": True}} + ) try: invalidate_cache(activity) except Exception: @@ -2367,7 +2373,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: obj_id = back.random_object_id() activity.set_id(back.activity_url(obj_id), obj_id) - back.save(Box.OUTBOX, activity) + save(Box.OUTBOX, activity) Tasks.cache_actor(activity.id) Tasks.finish_post_to_outbox(activity.id) return activity.id @@ -2376,7 +2382,14 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: def post_to_inbox(activity: ap.BaseActivity) -> None: # Check for Block activity actor = activity.get_actor() - if back.outbox_is_blocked(MY_PERSON, actor.id): + if DB.activities.find_one( + { + "box": Box.OUTBOX.value, + "type": ap.ActivityType.BLOCK.value, + "activity.object": actor.id, + "meta.undo": False, + } + ): app.logger.info( f"actor {actor!r} is blocked, dropping the received activity {activity!r}" ) @@ -2386,7 +2399,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: # The activity is already in the inbox app.logger.info(f"received duplicate activity {activity!r}, dropping it") - back.save(Box.INBOX, activity) + save(Box.INBOX, activity) Tasks.process_new_activity(activity.id) app.logger.info(f"spawning task for {activity!r}") @@ -2654,7 +2667,7 @@ def task_forward_activity(): iri = task.payload try: activity = ap.fetch_remote_activity(iri) - recipients = back.followers_as_recipients() + recipients = followers_as_recipients() app.logger.debug(f"Forwarding {activity!r} to {recipients}") activity = ap.clean_activity(activity.to_dict()) payload = json.dumps(activity) diff --git a/tasks.py b/tasks.py index f34d217..519dd48 100644 --- a/tasks.py +++ b/tasks.py @@ -312,7 +312,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: # The activity is already in the inbox log.info(f"received duplicate activity {activity!r}, dropping it") - back.save(Box.INBOX, activity) + activitypub.save(Box.INBOX, activity) process_new_activity.delay(activity.id) log.info(f"spawning task for {activity!r}") @@ -387,7 +387,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str: obj_id = back.random_object_id() activity.set_id(back.activity_url(obj_id), obj_id) - back.save(Box.OUTBOX, activity) + activitypub.save(Box.OUTBOX, activity) cache_actor.delay(activity.id) finish_post_to_outbox.delay(activity.id) return activity.id @@ -440,7 +440,7 @@ def finish_post_to_outbox(self, iri: str) -> None: def forward_activity(self, iri: str) -> None: try: activity = ap.fetch_remote_activity(iri) - recipients = back.followers_as_recipients() + recipients = activitypub.followers_as_recipients() log.debug(f"Forwarding {activity!r} to {recipients}") activity = ap.clean_activity(activity.to_dict()) payload = json.dumps(activity)