diff --git a/app/activitypub.py b/app/activitypub.py index 8e2a1f1..80e5201 100644 --- a/app/activitypub.py +++ b/app/activitypub.py @@ -327,17 +327,18 @@ def remove_context(raw_object: RawObject) -> RawObject: return a -def post(url: str, payload: dict[str, Any]) -> httpx.Response: +async def post(url: str, payload: dict[str, Any]) -> httpx.Response: check_url(url) - resp = httpx.post( - url, - headers={ - "User-Agent": config.USER_AGENT, - "Content-Type": config.AP_CONTENT_TYPE, - }, - json=payload, - auth=auth, - ) + async with httpx.AsyncClient() as client: + resp = await client.post( + url, + headers={ + "User-Agent": config.USER_AGENT, + "Content-Type": config.AP_CONTENT_TYPE, + }, + json=payload, + auth=auth, + ) resp.raise_for_status() return resp diff --git a/app/actor.py b/app/actor.py index 3761fc5..0e611e7 100644 --- a/app/actor.py +++ b/app/actor.py @@ -157,7 +157,12 @@ async def save_actor(db_session: AsyncSession, ap_actor: ap.RawObject) -> "Actor return actor -async def fetch_actor(db_session: AsyncSession, actor_id: str) -> "ActorModel": +async def fetch_actor( + db_session: AsyncSession, + actor_id: str, +) -> Union["ActorModel", RemoteActor]: + if actor_id == LOCAL_ACTOR.ap_id: + return LOCAL_ACTOR from app import models existing_actor = ( diff --git a/app/incoming_activities.py b/app/incoming_activities.py index 6a365d3..da1bf26 100644 --- a/app/incoming_activities.py +++ b/app/incoming_activities.py @@ -13,8 +13,8 @@ from app import ldsig from app import models from app.boxes import save_to_inbox from app.database import AsyncSession -from app.database import async_session from app.utils.datetime import now +from app.utils.workers import Worker _MAX_RETRIES = 8 @@ -67,11 +67,15 @@ def _set_next_try( outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries) -async def process_next_incoming_activity(db_session: AsyncSession) -> bool: +async def fetch_next_incoming_activity( + db_session: AsyncSession, + in_flight: set[int], +) -> models.IncomingActivity | None: where = [ models.IncomingActivity.next_try <= now(), models.IncomingActivity.is_errored.is_(False), models.IncomingActivity.is_processed.is_(False), + models.IncomingActivity.id.not_in(in_flight), ] q_count = await db_session.scalar( select(func.count(models.IncomingActivity.id)).where(*where) @@ -80,7 +84,7 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: logger.info(f"{q_count} incoming activities ready to process") if not q_count: # logger.debug("No activities to process") - return False + return None next_activity = ( await db_session.execute( @@ -91,6 +95,13 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: ) ).scalar_one() + return next_activity + + +async def process_next_incoming_activity( + db_session: AsyncSession, + next_activity: models.IncomingActivity, +) -> None: logger.info( f"incoming_activity={next_activity.ap_object}/" f"{next_activity.sent_by_ap_actor_id}" @@ -99,35 +110,45 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: next_activity.tries = next_activity.tries + 1 next_activity.last_try = now() - try: - async with db_session.begin_nested(): - await save_to_inbox( - db_session, - next_activity.ap_object, - next_activity.sent_by_ap_actor_id, - ) - except Exception: - logger.exception("Failed") - next_activity.error = traceback.format_exc() - _set_next_try(next_activity) - else: - logger.info("Success") - next_activity.is_processed = True + if next_activity.ap_object and next_activity.sent_by_ap_actor_id: + try: + async with db_session.begin_nested(): + await save_to_inbox( + db_session, + next_activity.ap_object, + next_activity.sent_by_ap_actor_id, + ) + except Exception: + logger.exception("Failed") + next_activity.error = traceback.format_exc() + _set_next_try(next_activity) + else: + logger.info("Success") + next_activity.is_processed = True + + # FIXME: webmention support await db_session.commit() - return True + return None + + +class IncomingActivityWorker(Worker[models.IncomingActivity]): + async def process_message( + self, + db_session: AsyncSession, + next_activity: models.IncomingActivity, + ) -> None: + await process_next_incoming_activity(db_session, next_activity) + + async def get_next_message( + self, + db_session: AsyncSession, + ) -> models.IncomingActivity | None: + return await fetch_next_incoming_activity(db_session, self.in_flight_ids()) async def loop() -> None: - async with async_session() as db_session: - while 1: - try: - await process_next_incoming_activity(db_session) - except Exception: - logger.exception("Failed to process next incoming activity") - raise - - await asyncio.sleep(1) + await IncomingActivityWorker(workers_count=1).run_forever() if __name__ == "__main__": diff --git a/app/models.py b/app/models.py index b2472c7..b04f6b6 100644 --- a/app/models.py +++ b/app/models.py @@ -320,7 +320,7 @@ class IncomingActivity(Base): ap_id = Column(String, nullable=True, index=True) ap_object: Mapped[ap.RawObject] = Column(JSON, nullable=True) - tries = Column(Integer, nullable=False, default=0) + tries: Mapped[int] = Column(Integer, nullable=False, default=0) next_try = Column(DateTime(timezone=True), nullable=True, default=now) last_try = Column(DateTime(timezone=True), nullable=True) diff --git a/app/outgoing_activities.py b/app/outgoing_activities.py index ae2f5c0..9f08de6 100644 --- a/app/outgoing_activities.py +++ b/app/outgoing_activities.py @@ -1,3 +1,4 @@ +import asyncio import email import time import traceback @@ -8,7 +9,6 @@ import httpx from loguru import logger from sqlalchemy import func from sqlalchemy import select -from sqlalchemy.orm import Session from sqlalchemy.orm import joinedload from app import activitypub as ap @@ -19,10 +19,10 @@ from app.actor import LOCAL_ACTOR from app.actor import _actor_hash from app.config import KEY_PATH from app.database import AsyncSession -from app.database import SessionLocal from app.key import Key from app.utils.datetime import now from app.utils.url import check_url +from app.utils.workers import Worker _MAX_RETRIES = 16 @@ -50,7 +50,9 @@ def _is_local_actor_updated() -> bool: return True -def _send_actor_update_if_needed(db_session: Session) -> None: +async def _send_actor_update_if_needed( + db_session: AsyncSession, +) -> None: """The process for sending an update for the local actor is done here as in production, we may have multiple uvicorn worker and this worker will always run in a single process.""" @@ -59,9 +61,9 @@ def _send_actor_update_if_needed(db_session: Session) -> None: logger.info("Will send an Update for the local actor") - from app.boxes import RemoteObject from app.boxes import allocate_outbox_id from app.boxes import outbox_object_id + from app.boxes import save_outbox_object update_activity_id = allocate_outbox_id() update_activity = { @@ -72,30 +74,15 @@ def _send_actor_update_if_needed(db_session: Session) -> None: "actor": config.ID, "object": ap.remove_context(LOCAL_ACTOR.ap_actor), } - ro = RemoteObject(update_activity, actor=LOCAL_ACTOR) - outbox_object = models.OutboxObject( - public_id=update_activity_id, - ap_type=ro.ap_type, - ap_id=ro.ap_id, - ap_context=ro.ap_context, - ap_object=ro.ap_object, - visibility=ro.visibility, - og_meta=None, - relates_to_inbox_object_id=None, - relates_to_outbox_object_id=None, - relates_to_actor_id=None, - activity_object_ap_id=LOCAL_ACTOR.ap_id, - is_hidden_from_homepage=True, - source=None, + outbox_object = await save_outbox_object( + db_session, update_activity_id, update_activity ) - db_session.add(outbox_object) - db_session.flush() # Send the update to the followers collection and all the actor we have ever # contacted followers = ( ( - db_session.scalars( + await db_session.scalars( select(models.Follower).options(joinedload(models.Follower.actor)) ) ) @@ -107,19 +94,17 @@ def _send_actor_update_if_needed(db_session: Session) -> None: for follower in followers } | { row.recipient - for row in db_session.execute( + for row in await db_session.execute( select(func.distinct(models.OutgoingActivity.recipient).label("recipient")) ) }: # type: ignore - outgoing_activity = models.OutgoingActivity( + await new_outgoing_activity( + db_session, recipient=rcp, outbox_object_id=outbox_object.id, - inbox_object_id=None, ) - db_session.add(outgoing_activity) - - db_session.commit() + await db_session.commit() async def new_outgoing_activity( @@ -183,50 +168,65 @@ def _set_next_try( outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries) -def process_next_outgoing_activity(db: Session) -> bool: +async def fetch_next_outgoing_activity( + db_session: AsyncSession, + in_fligh: set[int], +) -> models.OutgoingActivity | None: where = [ models.OutgoingActivity.next_try <= now(), models.OutgoingActivity.is_errored.is_(False), models.OutgoingActivity.is_sent.is_(False), + models.OutgoingActivity.id.not_in(in_fligh), ] - q_count = db.scalar(select(func.count(models.OutgoingActivity.id)).where(*where)) + q_count = await db_session.scalar( + select(func.count(models.OutgoingActivity.id)).where(*where) + ) if q_count > 0: logger.info(f"{q_count} outgoing activities ready to process") if not q_count: # logger.debug("No activities to process") - return False + return None - next_activity = db.execute( - select(models.OutgoingActivity) - .where(*where) - .limit(1) - .options( - joinedload(models.OutgoingActivity.inbox_object), - joinedload(models.OutgoingActivity.outbox_object), + next_activity = ( + await db_session.execute( + select(models.OutgoingActivity) + .where(*where) + .limit(1) + .options( + joinedload(models.OutgoingActivity.inbox_object), + joinedload(models.OutgoingActivity.outbox_object), + ) + .order_by(models.OutgoingActivity.next_try) ) - .order_by(models.OutgoingActivity.next_try) ).scalar_one() + return next_activity - next_activity.tries = next_activity.tries + 1 + +async def process_next_outgoing_activity( + db_session: AsyncSession, + next_activity: models.OutgoingActivity, +) -> None: + next_activity.tries = next_activity.tries + 1 # type: ignore next_activity.last_try = now() logger.info(f"recipient={next_activity.recipient}") try: - if next_activity.webmention_target: + if next_activity.webmention_target and next_activity.outbox_object: webmention_payload = { "source": next_activity.outbox_object.url, "target": next_activity.webmention_target, } logger.info(f"{webmention_payload=}") check_url(next_activity.recipient) - resp = httpx.post( - next_activity.recipient, - data=webmention_payload, - headers={ - "User-Agent": config.USER_AGENT, - }, - ) + async with httpx.AsyncClient() as client: + resp = await client.post( + next_activity.recipient, # type: ignore + data=webmention_payload, + headers={ + "User-Agent": config.USER_AGENT, + }, + ) resp.raise_for_status() else: payload = ap.wrap_object_if_needed(next_activity.anybox_object.ap_object) @@ -238,12 +238,12 @@ def process_next_outgoing_activity(db: Session) -> bool: "Delete", ]: # But only if the object is public (to help with deniability/privacy) - if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC: + if next_activity.outbox_object.visibility == ap.VisibilityEnum.PUBLIC: # type: ignore # noqa: E501 ldsig.generate_signature(payload, k) logger.info(f"{payload=}") - resp = ap.post(next_activity.recipient, payload) + resp = await ap.post(next_activity.recipient, payload) # type: ignore except httpx.HTTPStatusError as http_error: logger.exception("Failed") next_activity.last_status_code = http_error.response.status_code @@ -273,22 +273,31 @@ def process_next_outgoing_activity(db: Session) -> bool: next_activity.last_status_code = resp.status_code next_activity.last_response = resp.text - db.commit() - return True + await db_session.commit() + return None -def loop() -> None: - db = SessionLocal() - _send_actor_update_if_needed(db) - while 1: - try: - process_next_outgoing_activity(db) - except Exception: - logger.exception("Failed to process next outgoing activity") - raise +class OutgoingActivityWorker(Worker[models.OutgoingActivity]): + async def process_message( + self, + db_session: AsyncSession, + next_activity: models.OutgoingActivity, + ) -> None: + await process_next_outgoing_activity(db_session, next_activity) - time.sleep(1) + async def get_next_message( + self, + db_session: AsyncSession, + ) -> models.OutgoingActivity | None: + return await fetch_next_outgoing_activity(db_session, self.in_flight_ids()) + + async def startup(self, db_session: AsyncSession) -> None: + await _send_actor_update_if_needed(db_session) + + +async def loop() -> None: + await OutgoingActivityWorker(workers_count=3).run_forever() if __name__ == "__main__": - loop() + asyncio.run(loop()) diff --git a/app/utils/opengraph.py b/app/utils/opengraph.py index 6a66fe1..7308859 100644 --- a/app/utils/opengraph.py +++ b/app/utils/opengraph.py @@ -7,6 +7,7 @@ import httpx from bs4 import BeautifulSoup # type: ignore from pydantic import BaseModel +from loguru import logger from app import ap_object from app import config from app.actor import LOCAL_ACTOR @@ -64,7 +65,7 @@ async def external_urls( for tag in ro.tags: if tag_href := tag.get("href"): tags_hrefs.add(tag_href) - if tag.get("type") == "Mention" and tag["name"] != LOCAL_ACTOR.handle: + if tag.get("type") == "Mention": mentioned_actor = await fetch_actor(db_session, tag["href"]) tags_hrefs.add(mentioned_actor.url) tags_hrefs.add(mentioned_actor.ap_id) diff --git a/docs/templates/layout.html b/docs/templates/layout.html index f398f3c..65860ea 100644 --- a/docs/templates/layout.html +++ b/docs/templates/layout.html @@ -93,6 +93,7 @@ footer {