From 3f4a266157f1035d71ccbd043e26bcad46d834d8 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Wed, 20 Jul 2022 21:40:27 +0200 Subject: [PATCH] Tweak/fix incoming activity processing --- app/boxes.py | 15 +++++++++------ app/incoming_activities.py | 14 ++++++-------- app/outgoing_activities.py | 2 +- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/app/boxes.py b/app/boxes.py index a2370fb..ad8b7f6 100644 --- a/app/boxes.py +++ b/app/boxes.py @@ -75,7 +75,7 @@ async def save_outbox_object( source=source, ) db_session.add(outbox_object) - await db_session.commit() + await db_session.flush() await db_session.refresh(outbox_object) return outbox_object @@ -116,6 +116,8 @@ async def send_delete(db_session: AsyncSession, ap_object_id: str) -> None: for rcp in recipients: await new_outgoing_activity(db_session, rcp, outbox_object.id) + await db_session.commit() + async def send_like(db_session: AsyncSession, ap_object_id: str) -> None: inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id) @@ -137,11 +139,11 @@ async def send_like(db_session: AsyncSession, ap_object_id: str) -> None: raise ValueError("Should never happen") inbox_object.liked_via_outbox_object_ap_id = outbox_object.ap_id - await db_session.commit() await new_outgoing_activity( db_session, inbox_object.actor.inbox_url, outbox_object.id ) + await db_session.commit() async def send_announce(db_session: AsyncSession, ap_object_id: str) -> None: @@ -195,6 +197,7 @@ async def send_follow(db_session: AsyncSession, ap_actor_id: str) -> None: raise ValueError("Should never happen") await new_outgoing_activity(db_session, actor.inbox_url, outbox_object.id) + await db_session.commit() async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None: @@ -244,7 +247,6 @@ async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None: models.Following.ap_actor_id == followed_actor.ap_id ) ) - await db_session.commit() elif outbox_object_to_undo.ap_type == "Like": liked_object_ap_id = outbox_object_to_undo.activity_object_ap_id if not liked_object_ap_id: @@ -278,6 +280,8 @@ async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None: else: raise ValueError("Should never happen") + await db_session.commit() + async def send_create( db_session: AsyncSession, @@ -368,8 +372,6 @@ async def send_create( ) db_session.add(outbox_object_attachment) - await db_session.commit() - recipients = await _compute_recipients(db_session, note) for rcp in recipients: await new_outgoing_activity(db_session, rcp, outbox_object.id) @@ -389,6 +391,7 @@ async def send_create( webmention_target=target, ) + await db_session.commit() return note_id @@ -439,7 +442,6 @@ async def send_update( outbox_object.ap_object = note outbox_object.source = source outbox_object.revisions = revisions - await db_session.commit() recipients = await _compute_recipients(db_session, note) for rcp in recipients: @@ -460,6 +462,7 @@ async def send_update( webmention_target=target, ) + await db_session.commit() return outbox_object.public_id # type: ignore diff --git a/app/incoming_activities.py b/app/incoming_activities.py index 2c42e66..6a365d3 100644 --- a/app/incoming_activities.py +++ b/app/incoming_activities.py @@ -98,18 +98,16 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool: next_activity.tries = next_activity.tries + 1 next_activity.last_try = now() - await db_session.commit() try: - # async with db_session.begin_nested(): - await save_to_inbox( - db_session, - next_activity.ap_object, - next_activity.sent_by_ap_actor_id, - ) + async with db_session.begin_nested(): + await save_to_inbox( + db_session, + next_activity.ap_object, + next_activity.sent_by_ap_actor_id, + ) except Exception: logger.exception("Failed") - await db_session.rollback() next_activity.error = traceback.format_exc() _set_next_try(next_activity) else: diff --git a/app/outgoing_activities.py b/app/outgoing_activities.py index a2a773e..ae2f5c0 100644 --- a/app/outgoing_activities.py +++ b/app/outgoing_activities.py @@ -144,7 +144,7 @@ async def new_outgoing_activity( ) db_session.add(outgoing_activity) - await db_session.commit() + await db_session.flush() await db_session.refresh(outgoing_activity) return outgoing_activity