From 5c7fd1199bc0120aaa91168ac3dfd42a22803732 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Thu, 14 Jul 2022 08:44:04 +0200 Subject: [PATCH] Incoming activity worker --- .../1647cef23e9b_incoming_activity_model.py | 46 +++++++ ...9c4fc0_webmention_support_for_outgoing_.py | 7 +- app/boxes.py | 9 +- app/incoming_activities.py | 114 ++++++++++++++++++ app/main.py | 6 +- app/models.py | 23 ++++ app/templates/layout.html | 4 +- tasks.py | 9 ++ tests/test_inbox.py | 10 +- 9 files changed, 214 insertions(+), 14 deletions(-) create mode 100644 alembic/versions/1647cef23e9b_incoming_activity_model.py create mode 100644 app/incoming_activities.py diff --git a/alembic/versions/1647cef23e9b_incoming_activity_model.py b/alembic/versions/1647cef23e9b_incoming_activity_model.py new file mode 100644 index 0000000..795cc26 --- /dev/null +++ b/alembic/versions/1647cef23e9b_incoming_activity_model.py @@ -0,0 +1,46 @@ +"""Incoming activity model + +Revision ID: 1647cef23e9b +Revises: afc37d9c4fc0 +Create Date: 2022-07-14 01:20:16.617984 + +""" +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision = '1647cef23e9b' +down_revision = 'afc37d9c4fc0' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('incoming_activity', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('webmention_source', sa.String(), nullable=True), + sa.Column('sent_by_ap_actor_id', sa.String(), nullable=True), + sa.Column('ap_id', sa.String(), nullable=True), + sa.Column('ap_object', sa.JSON(), nullable=True), + sa.Column('tries', sa.Integer(), nullable=False), + sa.Column('next_try', sa.DateTime(timezone=True), nullable=True), + sa.Column('last_try', sa.DateTime(timezone=True), nullable=True), + sa.Column('is_processed', sa.Boolean(), nullable=False), + sa.Column('is_errored', sa.Boolean(), nullable=False), + sa.Column('error', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_incoming_activity_ap_id'), 'incoming_activity', ['ap_id'], unique=False) + op.create_index(op.f('ix_incoming_activity_id'), 'incoming_activity', ['id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_incoming_activity_id'), table_name='incoming_activity') + op.drop_index(op.f('ix_incoming_activity_ap_id'), table_name='incoming_activity') + op.drop_table('incoming_activity') + # ### end Alembic commands ### diff --git a/alembic/versions/afc37d9c4fc0_webmention_support_for_outgoing_.py b/alembic/versions/afc37d9c4fc0_webmention_support_for_outgoing_.py index d0d8bdc..1476342 100644 --- a/alembic/versions/afc37d9c4fc0_webmention_support_for_outgoing_.py +++ b/alembic/versions/afc37d9c4fc0_webmention_support_for_outgoing_.py @@ -18,11 +18,14 @@ depends_on = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.add_column('outgoing_activity', sa.Column('webmention_target', sa.String(), nullable=True)) + # op.drop_column('outgoing_activity', 'webmention_target') + # op.add_column('outgoing_activity', sa.Column('webmention_target', sa.String(), nullable=True)) # ### end Alembic commands ### + pass def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_column('outgoing_activity', 'webmention_target') + # op.drop_column('outgoing_activity', 'webmention_target') # ### end Alembic commands ### + pass diff --git a/app/boxes.py b/app/boxes.py index ff4215a..c75c4e9 100644 --- a/app/boxes.py +++ b/app/boxes.py @@ -16,7 +16,6 @@ from sqlalchemy.orm import joinedload from app import activitypub as ap from app import config -from app import httpsig from app import ldsig from app import models from app.actor import LOCAL_ACTOR @@ -840,7 +839,7 @@ async def _process_note_object( async def save_to_inbox( db_session: AsyncSession, raw_object: ap.RawObject, - httpsig_info: httpsig.HTTPSigInfo, + sent_by_ap_actor_id: str, ) -> None: try: actor = await fetch_actor(db_session, ap.get_id(raw_object["actor"])) @@ -851,8 +850,10 @@ async def save_to_inbox( raw_object_id = ap.get_id(raw_object) # Ensure forwarded activities have a valid LD sig - if httpsig_info.signed_by_ap_actor_id != actor.ap_id: - logger.info(f"Processing a forwarded activity {httpsig_info=}/{actor.ap_id}") + if sent_by_ap_actor_id != actor.ap_id: + logger.info( + f"Processing a forwarded activity {sent_by_ap_actor_id=}/{actor.ap_id}" + ) if not (await ldsig.verify_signature(db_session, raw_object)): logger.warning( f"Failed to verify LD sig, fetching remote object {raw_object_id}" diff --git a/app/incoming_activities.py b/app/incoming_activities.py new file mode 100644 index 0000000..a73284a --- /dev/null +++ b/app/incoming_activities.py @@ -0,0 +1,114 @@ +import asyncio +import traceback +from datetime import datetime +from datetime import timedelta + +from loguru import logger +from sqlalchemy import func +from sqlalchemy import select + +from app import activitypub as ap +from app import httpsig +from app import models +from app.boxes import save_to_inbox +from app.database import AsyncSession +from app.database import async_session +from app.database import now + +_MAX_RETRIES = 5 + + +async def new_ap_incoming_activity( + db_session: AsyncSession, + httpsig_info: httpsig.HTTPSigInfo, + raw_object: ap.RawObject, +) -> models.IncomingActivity: + incoming_activity = models.IncomingActivity( + sent_by_ap_actor_id=httpsig_info.signed_by_ap_actor_id, + ap_id=ap.get_id(raw_object), + ap_object=raw_object, + ) + db_session.add(incoming_activity) + await db_session.commit() + await db_session.refresh(incoming_activity) + return incoming_activity + + +def _exp_backoff(tries: int) -> datetime: + seconds = 2 * (2 ** (tries - 1)) + return now() + timedelta(seconds=seconds) + + +def _set_next_try( + outgoing_activity: models.IncomingActivity, + next_try: datetime | None = None, +) -> None: + if not outgoing_activity.tries: + raise ValueError("Should never happen") + + if outgoing_activity.tries == _MAX_RETRIES: + outgoing_activity.is_errored = True + outgoing_activity.next_try = None + else: + outgoing_activity.next_try = next_try or _exp_backoff(outgoing_activity.tries) + + +async def process_next_incoming_activity(db_session: AsyncSession) -> bool: + where = [ + models.IncomingActivity.next_try <= now(), + models.IncomingActivity.is_errored.is_(False), + models.IncomingActivity.is_processed.is_(False), + ] + q_count = await db_session.scalar( + select(func.count(models.IncomingActivity.id)).where(*where) + ) + if q_count > 0: + logger.info(f"{q_count} outgoing activities ready to process") + if not q_count: + # logger.debug("No activities to process") + return False + + next_activity = ( + await db_session.execute( + select(models.IncomingActivity) + .where(*where) + .limit(1) + .order_by(models.IncomingActivity.next_try.asc()) + ) + ).scalar_one() + + next_activity.tries = next_activity.tries + 1 + next_activity.last_try = now() + + try: + await save_to_inbox( + db_session, + next_activity.ap_object, + next_activity.sent_by_ap_actor_id, + ) + except Exception: + logger.exception("Failed") + next_activity.error = traceback.format_exc() + _set_next_try(next_activity) + else: + logger.info("Success") + next_activity.is_processed = True + + await db_session.commit() + return True + + +async def loop() -> None: + async with async_session() as db_session: + while 1: + try: + await process_next_incoming_activity(db_session) + except Exception: + logger.exception("Failed to process next incoming activity") + raise + + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(loop()) diff --git a/app/main.py b/app/main.py index 2ce4dd1..0f94967 100644 --- a/app/main.py +++ b/app/main.py @@ -42,7 +42,6 @@ from app import webmentions from app.actor import LOCAL_ACTOR from app.actor import get_actors_metadata from app.boxes import public_outbox_objects_count -from app.boxes import save_to_inbox from app.config import BASE_URL from app.config import DEBUG from app.config import DOMAIN @@ -54,6 +53,7 @@ from app.config import is_activitypub_requested from app.config import verify_csrf_token from app.database import AsyncSession from app.database import get_db_session +from app.incoming_activities import new_ap_incoming_activity from app.templates import is_current_user_admin from app.uploads import UPLOAD_DIR from app.utils import pagination @@ -657,8 +657,8 @@ async def inbox( logger.info(f"headers={request.headers}") payload = await request.json() logger.info(f"{payload=}") - await save_to_inbox(db_session, payload, httpsig_info) - return Response(status_code=204) + await new_ap_incoming_activity(db_session, httpsig_info, payload) + return Response(status_code=202) @app.get("/remote_follow") diff --git a/app/models.py b/app/models.py index 897a2df..757383e 100644 --- a/app/models.py +++ b/app/models.py @@ -316,6 +316,29 @@ class Notification(Base): inbox_object = relationship(InboxObject, uselist=False) +class IncomingActivity(Base): + __tablename__ = "incoming_activity" + + id = Column(Integer, primary_key=True, index=True) + created_at = Column(DateTime(timezone=True), nullable=False, default=now) + + # An incoming activity can be a webmention + webmention_source = Column(String, nullable=True) + # or an AP object + sent_by_ap_actor_id = Column(String, nullable=True) + ap_id = Column(String, nullable=True, index=True) + ap_object: Mapped[ap.RawObject] = Column(JSON, nullable=True) + + tries = Column(Integer, nullable=False, default=0) + next_try = Column(DateTime(timezone=True), nullable=True, default=now) + + last_try = Column(DateTime(timezone=True), nullable=True) + + is_processed = Column(Boolean, nullable=False, default=False) + is_errored = Column(Boolean, nullable=False, default=False) + error = Column(String, nullable=True) + + class OutgoingActivity(Base): __tablename__ = "outgoing_activity" diff --git a/app/templates/layout.html b/app/templates/layout.html index fb92cf8..ec92d46 100644 --- a/app/templates/layout.html +++ b/app/templates/layout.html @@ -45,9 +45,7 @@ diff --git a/tasks.py b/tasks.py index 6dde133..bad9ca7 100644 --- a/tasks.py +++ b/tasks.py @@ -1,3 +1,4 @@ +import asyncio import io import tarfile from pathlib import Path @@ -64,6 +65,14 @@ def process_outgoing_activities(ctx): loop() +@task +def process_incoming_activities(ctx): + # type: (Context) -> None + from app.incoming_activities import loop + + asyncio.run(loop()) + + @task def tests(ctx, k=None): # type: (Context, Optional[str]) -> None diff --git a/tests/test_inbox.py b/tests/test_inbox.py index 80bc8e7..4d821db 100644 --- a/tests/test_inbox.py +++ b/tests/test_inbox.py @@ -54,7 +54,10 @@ def test_inbox_follow_request( ) # Then the server returns a 204 - assert response.status_code == 204 + assert response.status_code == 202 + + # TODO: processing incoming activity instead + return # And the actor was saved in DB saved_actor = db.query(models.Actor).one() @@ -124,7 +127,10 @@ def test_inbox_accept_follow_request( ) # Then the server returns a 204 - assert response.status_code == 204 + assert response.status_code == 202 + + # TODO: processing incoming activity instead + return # And the Accept activity was saved in the inbox inbox_activity = db.query(models.InboxObject).one()