diff --git a/app/actor.py b/app/actor.py index ca56f98..7cc85eb 100644 --- a/app/actor.py +++ b/app/actor.py @@ -1,3 +1,4 @@ +import hashlib import typing from dataclasses import dataclass from typing import Union @@ -226,3 +227,29 @@ async def get_actors_metadata( inbox_follow_ap_id=followers.get(actor.ap_id), ) return idx + + +def _actor_hash(actor: Actor) -> bytes: + """Used to detect when an actor is updated""" + h = hashlib.blake2b(digest_size=32) + h.update(actor.ap_id.encode()) + h.update(actor.handle.encode()) + + if actor.name: + h.update(actor.name.encode()) + + if actor.summary: + h.update(actor.summary.encode()) + + if actor.url: + h.update(actor.url.encode()) + + h.update(actor.display_name.encode()) + + if actor.icon_url: + h.update(actor.icon_url.encode()) + + h.update(actor.public_key_id.encode()) + h.update(actor.public_key_as_pem.encode()) + + return h.digest() diff --git a/app/outgoing_activities.py b/app/outgoing_activities.py index 04a9201..e7db4d5 100644 --- a/app/outgoing_activities.py +++ b/app/outgoing_activities.py @@ -15,6 +15,8 @@ from app import activitypub as ap from app import config from app import ldsig from app import models +from app.actor import LOCAL_ACTOR +from app.actor import _actor_hash from app.config import KEY_PATH from app.database import AsyncSession from app.database import SessionLocal @@ -27,6 +29,92 @@ k = Key(config.ID, f"{config.ID}#main-key") k.load(KEY_PATH.read_text()) +def _is_local_actor_updated() -> bool: + """Returns True if the local actor was updated, i.e. updated via the config file""" + actor_hash = _actor_hash(LOCAL_ACTOR) + actor_hash_cache = config.ROOT_DIR / "data" / "local_actor_hash.dat" + + if not actor_hash_cache.exists(): + logger.info("Initializing local actor hash cache") + actor_hash_cache.write_bytes(actor_hash) + return False + + previous_actor_hash = actor_hash_cache.read_bytes() + if previous_actor_hash == actor_hash: + logger.info("Local actor hasn't been updated") + return False + + actor_hash_cache.write_bytes(actor_hash) + logger.info("Local actor has been updated") + return True + + +def _send_actor_update_if_needed(db_session: Session) -> None: + """The process for sending an update for the local actor is done here as + in production, we may have multiple uvicorn worker and this worker will + always run in a single process.""" + if not _is_local_actor_updated(): + return + + logger.info("Will send an Update for the local actor") + + from app.boxes import RemoteObject + from app.boxes import allocate_outbox_id + from app.boxes import outbox_object_id + + update_activity_id = allocate_outbox_id() + update_activity = { + "@context": ap.AS_EXTENDED_CTX, + "id": outbox_object_id(update_activity_id), + "type": "Update", + "to": [ap.AS_PUBLIC], + "actor": config.ID, + "object": ap.remove_context(LOCAL_ACTOR.ap_actor), + } + ro = RemoteObject(update_activity, actor=LOCAL_ACTOR) + outbox_object = models.OutboxObject( + public_id=update_activity_id, + ap_type=ro.ap_type, + ap_id=ro.ap_id, + ap_context=ro.ap_context, + ap_object=ro.ap_object, + visibility=ro.visibility, + og_meta=None, + relates_to_inbox_object_id=None, + relates_to_outbox_object_id=None, + relates_to_actor_id=None, + activity_object_ap_id=LOCAL_ACTOR.ap_id, + is_hidden_from_homepage=True, + source=None, + ) + db_session.add(outbox_object) + db_session.flush() + + # TODO(ts): also send to every actor we contact (distinct on recipient) + followers = ( + ( + db_session.scalars( + select(models.Follower).options(joinedload(models.Follower.actor)) + ) + ) + .unique() + .all() + ) + for rcp in { + follower.actor.shared_inbox_url or follower.actor.inbox_url + for follower in followers + }: + outgoing_activity = models.OutgoingActivity( + recipient=rcp, + outbox_object_id=outbox_object.id, + inbox_object_id=None, + ) + + db_session.add(outgoing_activity) + + db_session.commit() + + async def new_outgoing_activity( db_session: AsyncSession, recipient: str, @@ -116,6 +204,7 @@ def process_next_outgoing_activity(db: Session) -> bool: # Use LD sig if the activity may need to be forwarded by recipients if next_activity.anybox_object.is_from_outbox and payload["type"] in [ "Create", + "Update", "Delete", ]: # But only if the object is public (to help with deniability/privacy) @@ -160,6 +249,7 @@ def process_next_outgoing_activity(db: Session) -> bool: def loop() -> None: db = SessionLocal() + _send_actor_update_if_needed(db) while 1: try: process_next_outgoing_activity(db)