forked from forks/microblog.pub
Cleanup inbox processing
This commit is contained in:
parent
8e57bb9245
commit
2d28ca3614
2 changed files with 129 additions and 131 deletions
259
app/boxes.py
259
app/boxes.py
|
@ -839,9 +839,24 @@ async def _handle_delete_activity(
|
||||||
db_session: AsyncSession,
|
db_session: AsyncSession,
|
||||||
from_actor: models.Actor,
|
from_actor: models.Actor,
|
||||||
delete_activity: models.InboxObject,
|
delete_activity: models.InboxObject,
|
||||||
ap_object_to_delete: models.InboxObject | models.Actor | None,
|
relates_to_inbox_object: models.InboxObject | None,
|
||||||
forwarded_by_actor: models.Actor | None,
|
forwarded_by_actor: models.Actor | None,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
ap_object_to_delete: models.InboxObject | models.Actor | None
|
||||||
|
if relates_to_inbox_object:
|
||||||
|
ap_object_to_delete = relates_to_inbox_object
|
||||||
|
elif delete_activity.activity_object_ap_id:
|
||||||
|
# If it's not a Delete for an inbox object, it may be related to
|
||||||
|
# an actor
|
||||||
|
try:
|
||||||
|
ap_object_to_delete = await fetch_actor(
|
||||||
|
db_session,
|
||||||
|
delete_activity.activity_object_ap_id,
|
||||||
|
save_if_not_found=False,
|
||||||
|
)
|
||||||
|
except ap.ObjectNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
if ap_object_to_delete is None:
|
if ap_object_to_delete is None:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Received Delete for an unknown object "
|
"Received Delete for an unknown object "
|
||||||
|
@ -918,6 +933,8 @@ async def _handle_delete_activity(
|
||||||
forwarded_by_actor=None,
|
forwarded_by_actor=None,
|
||||||
)
|
)
|
||||||
inbox_object.is_deleted = True
|
inbox_object.is_deleted = True
|
||||||
|
else:
|
||||||
|
raise ValueError("Should never happen")
|
||||||
|
|
||||||
await db_session.flush()
|
await db_session.flush()
|
||||||
|
|
||||||
|
@ -1595,6 +1612,101 @@ async def _handle_vote_answer(
|
||||||
await new_outgoing_activity(db_session, rcp, question.id)
|
await new_outgoing_activity(db_session, rcp, question.id)
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_announce_activity(
|
||||||
|
db_session: AsyncSession,
|
||||||
|
actor: models.Actor,
|
||||||
|
announce_activity: models.InboxObject,
|
||||||
|
relates_to_outbox_object: models.OutboxObject | None,
|
||||||
|
relates_to_inbox_object: models.InboxObject | None,
|
||||||
|
):
|
||||||
|
if relates_to_outbox_object:
|
||||||
|
# This is an announce for a local object
|
||||||
|
relates_to_outbox_object.announces_count = (
|
||||||
|
models.OutboxObject.announces_count + 1
|
||||||
|
)
|
||||||
|
|
||||||
|
notif = models.Notification(
|
||||||
|
notification_type=models.NotificationType.ANNOUNCE,
|
||||||
|
actor_id=actor.id,
|
||||||
|
outbox_object_id=relates_to_outbox_object.id,
|
||||||
|
inbox_object_id=announce_activity.id,
|
||||||
|
)
|
||||||
|
db_session.add(notif)
|
||||||
|
else:
|
||||||
|
# Only show the announce in the stream if it comes from an actor
|
||||||
|
# in the following collection
|
||||||
|
followings = await _get_following(db_session)
|
||||||
|
is_from_following = announce_activity.actor.ap_id in {
|
||||||
|
f.ap_actor_id for f in followings
|
||||||
|
}
|
||||||
|
|
||||||
|
# This is announce for a maybe unknown object
|
||||||
|
if relates_to_inbox_object:
|
||||||
|
# We already know about this object, show the announce in the
|
||||||
|
# stream if it's not already there, from an followed actor
|
||||||
|
# and if we haven't seen it recently
|
||||||
|
if (
|
||||||
|
now() - as_utc(relates_to_inbox_object.ap_published_at) # type: ignore
|
||||||
|
) > timedelta(hours=1):
|
||||||
|
announce_activity.is_hidden_from_stream = not is_from_following
|
||||||
|
else:
|
||||||
|
# Save it as an inbox object
|
||||||
|
if not announce_activity.activity_object_ap_id:
|
||||||
|
raise ValueError("Should never happen")
|
||||||
|
announced_raw_object = await ap.fetch(
|
||||||
|
announce_activity.activity_object_ap_id
|
||||||
|
)
|
||||||
|
announced_actor = await fetch_actor(
|
||||||
|
db_session, ap.get_actor_id(announced_raw_object)
|
||||||
|
)
|
||||||
|
if not announced_actor.is_blocked:
|
||||||
|
announced_object = RemoteObject(announced_raw_object, announced_actor)
|
||||||
|
announced_inbox_object = models.InboxObject(
|
||||||
|
server=urlparse(announced_object.ap_id).hostname,
|
||||||
|
actor_id=announced_actor.id,
|
||||||
|
ap_actor_id=announced_actor.ap_id,
|
||||||
|
ap_type=announced_object.ap_type,
|
||||||
|
ap_id=announced_object.ap_id,
|
||||||
|
ap_context=announced_object.ap_context,
|
||||||
|
ap_published_at=announced_object.ap_published_at,
|
||||||
|
ap_object=announced_object.ap_object,
|
||||||
|
visibility=announced_object.visibility,
|
||||||
|
og_meta=await opengraph.og_meta_from_note(
|
||||||
|
db_session, announced_object
|
||||||
|
),
|
||||||
|
is_hidden_from_stream=True,
|
||||||
|
)
|
||||||
|
db_session.add(announced_inbox_object)
|
||||||
|
await db_session.flush()
|
||||||
|
announce_activity.relates_to_inbox_object_id = announced_inbox_object.id
|
||||||
|
announce_activity.is_hidden_from_stream = not is_from_following
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_like_activity(
|
||||||
|
db_session: AsyncSession,
|
||||||
|
actor: models.Actor,
|
||||||
|
like_activity: models.InboxObject,
|
||||||
|
relates_to_outbox_object: models.OutboxObject | None,
|
||||||
|
relates_to_inbox_object: models.InboxObject | None,
|
||||||
|
):
|
||||||
|
if not relates_to_outbox_object:
|
||||||
|
logger.info(
|
||||||
|
"Received a like for an unknown activity: "
|
||||||
|
f"{like_activity.activity_object_ap_id}, deleting the activity"
|
||||||
|
)
|
||||||
|
await db_session.delete(like_activity)
|
||||||
|
else:
|
||||||
|
relates_to_outbox_object.likes_count = models.OutboxObject.likes_count + 1
|
||||||
|
|
||||||
|
notif = models.Notification(
|
||||||
|
notification_type=models.NotificationType.LIKE,
|
||||||
|
actor_id=actor.id,
|
||||||
|
outbox_object_id=relates_to_outbox_object.id,
|
||||||
|
inbox_object_id=like_activity.id,
|
||||||
|
)
|
||||||
|
db_session.add(notif)
|
||||||
|
|
||||||
|
|
||||||
async def _process_transient_object(
|
async def _process_transient_object(
|
||||||
db_session: AsyncSession,
|
db_session: AsyncSession,
|
||||||
raw_object: ap.RawObject,
|
raw_object: ap.RawObject,
|
||||||
|
@ -1727,26 +1839,11 @@ async def save_to_inbox(
|
||||||
elif activity_ro.ap_type == "Move":
|
elif activity_ro.ap_type == "Move":
|
||||||
await _handle_move_activity(db_session, actor, inbox_object)
|
await _handle_move_activity(db_session, actor, inbox_object)
|
||||||
elif activity_ro.ap_type == "Delete":
|
elif activity_ro.ap_type == "Delete":
|
||||||
object_to_delete: models.InboxObject | models.Actor | None
|
|
||||||
if relates_to_inbox_object:
|
|
||||||
object_to_delete = relates_to_inbox_object
|
|
||||||
elif inbox_object.activity_object_ap_id:
|
|
||||||
# If it's not a Delete for an inbox object, it may be related to
|
|
||||||
# an actor
|
|
||||||
try:
|
|
||||||
object_to_delete = await fetch_actor(
|
|
||||||
db_session,
|
|
||||||
inbox_object.activity_object_ap_id,
|
|
||||||
save_if_not_found=False,
|
|
||||||
)
|
|
||||||
except ap.ObjectNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
await _handle_delete_activity(
|
await _handle_delete_activity(
|
||||||
db_session,
|
db_session,
|
||||||
actor,
|
actor,
|
||||||
inbox_object,
|
inbox_object,
|
||||||
object_to_delete,
|
relates_to_inbox_object,
|
||||||
forwarded_by_actor=forwarded_by_actor,
|
forwarded_by_actor=forwarded_by_actor,
|
||||||
)
|
)
|
||||||
elif activity_ro.ap_type == "Follow":
|
elif activity_ro.ap_type == "Follow":
|
||||||
|
@ -1811,124 +1908,26 @@ async def save_to_inbox(
|
||||||
"Received a reaction for an unknown activity: "
|
"Received a reaction for an unknown activity: "
|
||||||
f"{activity_ro.activity_object_ap_id}"
|
f"{activity_ro.activity_object_ap_id}"
|
||||||
)
|
)
|
||||||
|
await db_session.delete(inbox_object)
|
||||||
else:
|
else:
|
||||||
# TODO(ts): support reactions
|
# TODO(ts): support reactions
|
||||||
pass
|
pass
|
||||||
elif activity_ro.ap_type == "Like":
|
elif activity_ro.ap_type == "Like":
|
||||||
if not relates_to_outbox_object:
|
await _handle_like_activity(
|
||||||
logger.info(
|
db_session,
|
||||||
"Received a like for an unknown activity: "
|
actor,
|
||||||
f"{activity_ro.activity_object_ap_id}, deleting the activity"
|
inbox_object,
|
||||||
)
|
relates_to_outbox_object,
|
||||||
await db_session.delete(inbox_object)
|
relates_to_inbox_object,
|
||||||
else:
|
)
|
||||||
relates_to_outbox_object.likes_count = models.OutboxObject.likes_count + 1
|
|
||||||
|
|
||||||
notif = models.Notification(
|
|
||||||
notification_type=models.NotificationType.LIKE,
|
|
||||||
actor_id=actor.id,
|
|
||||||
outbox_object_id=relates_to_outbox_object.id,
|
|
||||||
inbox_object_id=inbox_object.id,
|
|
||||||
)
|
|
||||||
db_session.add(notif)
|
|
||||||
elif activity_ro.ap_type == "Announce":
|
elif activity_ro.ap_type == "Announce":
|
||||||
if relates_to_outbox_object:
|
await _handle_announce_activity(
|
||||||
# This is an announce for a local object
|
db_session,
|
||||||
relates_to_outbox_object.announces_count = (
|
actor,
|
||||||
models.OutboxObject.announces_count + 1
|
inbox_object,
|
||||||
)
|
relates_to_outbox_object,
|
||||||
|
relates_to_inbox_object,
|
||||||
notif = models.Notification(
|
)
|
||||||
notification_type=models.NotificationType.ANNOUNCE,
|
|
||||||
actor_id=actor.id,
|
|
||||||
outbox_object_id=relates_to_outbox_object.id,
|
|
||||||
inbox_object_id=inbox_object.id,
|
|
||||||
)
|
|
||||||
db_session.add(notif)
|
|
||||||
else:
|
|
||||||
# Only show the announce in the stream if it comes from an actor
|
|
||||||
# in the following collection
|
|
||||||
followings = await _get_following(db_session)
|
|
||||||
is_from_following = inbox_object.actor.ap_id in {
|
|
||||||
f.ap_actor_id for f in followings
|
|
||||||
}
|
|
||||||
|
|
||||||
# This is announce for a maybe unknown object
|
|
||||||
if relates_to_inbox_object:
|
|
||||||
# We already know about this object, show the announce in the
|
|
||||||
# stream if it's not already there, from an followed actor
|
|
||||||
# and if we haven't seen it recently
|
|
||||||
if (
|
|
||||||
now()
|
|
||||||
- as_utc(relates_to_inbox_object.ap_published_at) # type: ignore
|
|
||||||
) > timedelta(hours=1):
|
|
||||||
inbox_object.is_hidden_from_stream = not is_from_following
|
|
||||||
else:
|
|
||||||
# Save it as an inbox object
|
|
||||||
if not activity_ro.activity_object_ap_id:
|
|
||||||
raise ValueError("Should never happen")
|
|
||||||
announced_raw_object = await ap.fetch(activity_ro.activity_object_ap_id)
|
|
||||||
announced_actor = await fetch_actor(
|
|
||||||
db_session, ap.get_actor_id(announced_raw_object)
|
|
||||||
)
|
|
||||||
if not announced_actor.is_blocked:
|
|
||||||
announced_object = RemoteObject(
|
|
||||||
announced_raw_object, announced_actor
|
|
||||||
)
|
|
||||||
announced_inbox_object = models.InboxObject(
|
|
||||||
server=urlparse(announced_object.ap_id).hostname,
|
|
||||||
actor_id=announced_actor.id,
|
|
||||||
ap_actor_id=announced_actor.ap_id,
|
|
||||||
ap_type=announced_object.ap_type,
|
|
||||||
ap_id=announced_object.ap_id,
|
|
||||||
ap_context=announced_object.ap_context,
|
|
||||||
ap_published_at=announced_object.ap_published_at,
|
|
||||||
ap_object=announced_object.ap_object,
|
|
||||||
visibility=announced_object.visibility,
|
|
||||||
og_meta=await opengraph.og_meta_from_note(
|
|
||||||
db_session, announced_object
|
|
||||||
),
|
|
||||||
is_hidden_from_stream=True,
|
|
||||||
)
|
|
||||||
db_session.add(announced_inbox_object)
|
|
||||||
await db_session.flush()
|
|
||||||
inbox_object.relates_to_inbox_object_id = announced_inbox_object.id
|
|
||||||
inbox_object.is_hidden_from_stream = not is_from_following
|
|
||||||
|
|
||||||
elif activity_ro.ap_type in ["Like", "Announce"]:
|
|
||||||
if not relates_to_outbox_object:
|
|
||||||
logger.info(
|
|
||||||
f"Received {activity_ro.ap_type} for an unknown activity: "
|
|
||||||
f"{activity_ro.activity_object_ap_id}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
if activity_ro.ap_type == "Like":
|
|
||||||
relates_to_outbox_object.likes_count = (
|
|
||||||
models.OutboxObject.likes_count + 1
|
|
||||||
)
|
|
||||||
|
|
||||||
notif = models.Notification(
|
|
||||||
notification_type=models.NotificationType.LIKE,
|
|
||||||
actor_id=actor.id,
|
|
||||||
outbox_object_id=relates_to_outbox_object.id,
|
|
||||||
inbox_object_id=inbox_object.id,
|
|
||||||
)
|
|
||||||
db_session.add(notif)
|
|
||||||
elif activity_ro.ap_type == "Announce":
|
|
||||||
relates_to_outbox_object.announces_count = (
|
|
||||||
models.OutboxObject.announces_count + 1
|
|
||||||
)
|
|
||||||
|
|
||||||
notif = models.Notification(
|
|
||||||
notification_type=models.NotificationType.ANNOUNCE,
|
|
||||||
actor_id=actor.id,
|
|
||||||
outbox_object_id=relates_to_outbox_object.id,
|
|
||||||
inbox_object_id=inbox_object.id,
|
|
||||||
)
|
|
||||||
db_session.add(notif)
|
|
||||||
else:
|
|
||||||
raise ValueError("Should never happen")
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Received an unknown {inbox_object.ap_type} object")
|
logger.warning(f"Received an unknown {inbox_object.ap_type} object")
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,6 @@ _RESIZED_CACHE: MutableMapping[tuple[str, int], tuple[bytes, str, Any]] = LFUCac
|
||||||
# Next:
|
# Next:
|
||||||
# - fix issue with followers from a blocked server (skip it?)
|
# - fix issue with followers from a blocked server (skip it?)
|
||||||
# - CORS webfinger endpoint
|
# - CORS webfinger endpoint
|
||||||
# - support actor delete
|
|
||||||
# - allow to share old notes
|
# - allow to share old notes
|
||||||
# - allow to interact with object not in anybox (i.e. like from a lookup)
|
# - allow to interact with object not in anybox (i.e. like from a lookup)
|
||||||
# - only show 10 most recent threads in DMs
|
# - only show 10 most recent threads in DMs
|
||||||
|
|
Loading…
Reference in a new issue