Tweak/fix incoming activity processing

This commit is contained in:
Thomas Sileo 2022-07-20 21:40:27 +02:00
parent 4a975dcbfa
commit 3f4a266157
3 changed files with 16 additions and 15 deletions

View file

@ -75,7 +75,7 @@ async def save_outbox_object(
source=source, source=source,
) )
db_session.add(outbox_object) db_session.add(outbox_object)
await db_session.commit() await db_session.flush()
await db_session.refresh(outbox_object) await db_session.refresh(outbox_object)
return outbox_object return outbox_object
@ -116,6 +116,8 @@ async def send_delete(db_session: AsyncSession, ap_object_id: str) -> None:
for rcp in recipients: for rcp in recipients:
await new_outgoing_activity(db_session, rcp, outbox_object.id) await new_outgoing_activity(db_session, rcp, outbox_object.id)
await db_session.commit()
async def send_like(db_session: AsyncSession, ap_object_id: str) -> None: async def send_like(db_session: AsyncSession, ap_object_id: str) -> None:
inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id) inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id)
@ -137,11 +139,11 @@ async def send_like(db_session: AsyncSession, ap_object_id: str) -> None:
raise ValueError("Should never happen") raise ValueError("Should never happen")
inbox_object.liked_via_outbox_object_ap_id = outbox_object.ap_id inbox_object.liked_via_outbox_object_ap_id = outbox_object.ap_id
await db_session.commit()
await new_outgoing_activity( await new_outgoing_activity(
db_session, inbox_object.actor.inbox_url, outbox_object.id db_session, inbox_object.actor.inbox_url, outbox_object.id
) )
await db_session.commit()
async def send_announce(db_session: AsyncSession, ap_object_id: str) -> None: async def send_announce(db_session: AsyncSession, ap_object_id: str) -> None:
@ -195,6 +197,7 @@ async def send_follow(db_session: AsyncSession, ap_actor_id: str) -> None:
raise ValueError("Should never happen") raise ValueError("Should never happen")
await new_outgoing_activity(db_session, actor.inbox_url, outbox_object.id) await new_outgoing_activity(db_session, actor.inbox_url, outbox_object.id)
await db_session.commit()
async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None: async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None:
@ -244,7 +247,6 @@ async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None:
models.Following.ap_actor_id == followed_actor.ap_id models.Following.ap_actor_id == followed_actor.ap_id
) )
) )
await db_session.commit()
elif outbox_object_to_undo.ap_type == "Like": elif outbox_object_to_undo.ap_type == "Like":
liked_object_ap_id = outbox_object_to_undo.activity_object_ap_id liked_object_ap_id = outbox_object_to_undo.activity_object_ap_id
if not liked_object_ap_id: if not liked_object_ap_id:
@ -278,6 +280,8 @@ async def send_undo(db_session: AsyncSession, ap_object_id: str) -> None:
else: else:
raise ValueError("Should never happen") raise ValueError("Should never happen")
await db_session.commit()
async def send_create( async def send_create(
db_session: AsyncSession, db_session: AsyncSession,
@ -368,8 +372,6 @@ async def send_create(
) )
db_session.add(outbox_object_attachment) db_session.add(outbox_object_attachment)
await db_session.commit()
recipients = await _compute_recipients(db_session, note) recipients = await _compute_recipients(db_session, note)
for rcp in recipients: for rcp in recipients:
await new_outgoing_activity(db_session, rcp, outbox_object.id) await new_outgoing_activity(db_session, rcp, outbox_object.id)
@ -389,6 +391,7 @@ async def send_create(
webmention_target=target, webmention_target=target,
) )
await db_session.commit()
return note_id return note_id
@ -439,7 +442,6 @@ async def send_update(
outbox_object.ap_object = note outbox_object.ap_object = note
outbox_object.source = source outbox_object.source = source
outbox_object.revisions = revisions outbox_object.revisions = revisions
await db_session.commit()
recipients = await _compute_recipients(db_session, note) recipients = await _compute_recipients(db_session, note)
for rcp in recipients: for rcp in recipients:
@ -460,6 +462,7 @@ async def send_update(
webmention_target=target, webmention_target=target,
) )
await db_session.commit()
return outbox_object.public_id # type: ignore return outbox_object.public_id # type: ignore

View file

@ -98,18 +98,16 @@ async def process_next_incoming_activity(db_session: AsyncSession) -> bool:
next_activity.tries = next_activity.tries + 1 next_activity.tries = next_activity.tries + 1
next_activity.last_try = now() next_activity.last_try = now()
await db_session.commit()
try: try:
# async with db_session.begin_nested(): async with db_session.begin_nested():
await save_to_inbox( await save_to_inbox(
db_session, db_session,
next_activity.ap_object, next_activity.ap_object,
next_activity.sent_by_ap_actor_id, next_activity.sent_by_ap_actor_id,
) )
except Exception: except Exception:
logger.exception("Failed") logger.exception("Failed")
await db_session.rollback()
next_activity.error = traceback.format_exc() next_activity.error = traceback.format_exc()
_set_next_try(next_activity) _set_next_try(next_activity)
else: else:

View file

@ -144,7 +144,7 @@ async def new_outgoing_activity(
) )
db_session.add(outgoing_activity) db_session.add(outgoing_activity)
await db_session.commit() await db_session.flush()
await db_session.refresh(outgoing_activity) await db_session.refresh(outgoing_activity)
return outgoing_activity return outgoing_activity