Prefetch some notes when following an actor

This commit is contained in:
Thomas Sileo 2022-08-15 19:20:56 +02:00
parent c711096262
commit 59af633c6c
4 changed files with 63 additions and 3 deletions

View file

@ -166,6 +166,7 @@ async def parse_collection( # noqa: C901
url: str | None = None, url: str | None = None,
payload: RawObject | None = None, payload: RawObject | None = None,
level: int = 0, level: int = 0,
limit: int = 0,
) -> list[RawObject]: ) -> list[RawObject]:
"""Resolve/fetch a `Collection`/`OrderedCollection`.""" """Resolve/fetch a `Collection`/`OrderedCollection`."""
if level > 3: if level > 3:
@ -193,7 +194,9 @@ async def parse_collection( # noqa: C901
if "first" in payload: if "first" in payload:
if isinstance(payload["first"], str): if isinstance(payload["first"], str):
out.extend( out.extend(
await parse_collection(url=payload["first"], level=level + 1) await parse_collection(
url=payload["first"], level=level + 1, limit=limit
)
) )
else: else:
if "orderedItems" in payload["first"]: if "orderedItems" in payload["first"]:
@ -202,7 +205,9 @@ async def parse_collection( # noqa: C901
out.extend(payload["first"]["items"]) out.extend(payload["first"]["items"])
n = payload["first"].get("next") n = payload["first"].get("next")
if n: if n:
out.extend(await parse_collection(url=n, level=level + 1)) out.extend(
await parse_collection(url=n, level=level + 1, limit=limit)
)
return out return out
while payload: while payload:
@ -212,7 +217,7 @@ async def parse_collection( # noqa: C901
if "items" in payload: if "items" in payload:
out.extend(payload["items"]) out.extend(payload["items"])
n = payload.get("next") n = payload.get("next")
if n is None: if n is None or (limit > 0 and len(out) >= limit):
break break
payload = await fetch(n) payload = await fetch(n)
else: else:

View file

@ -68,6 +68,10 @@ class Actor:
def inbox_url(self) -> str: def inbox_url(self) -> str:
return self.ap_actor["inbox"] return self.ap_actor["inbox"]
@property
def outbox_url(self) -> str:
return self.ap_actor["outbox"]
@property @property
def shared_inbox_url(self) -> str: def shared_inbox_url(self) -> str:
return self.ap_actor.get("endpoints", {}).get("sharedInbox") or self.inbox_url return self.ap_actor.get("endpoints", {}).get("sharedInbox") or self.inbox_url

View file

@ -132,6 +132,8 @@ async def send_like(db_session: AsyncSession, ap_object_id: str) -> None:
raw_object = await ap.fetch(ap.get_id(ap_object_id)) raw_object = await ap.fetch(ap.get_id(ap_object_id))
await save_object_to_inbox(db_session, raw_object) await save_object_to_inbox(db_session, raw_object)
await db_session.commit() await db_session.commit()
# XXX: we need to reload it as lazy-loading the actor will fail
# (asyncio SQLAlchemy issue)
inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id) inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id)
if not inbox_object: if not inbox_object:
raise ValueError("Should never happen") raise ValueError("Should never happen")
@ -165,6 +167,8 @@ async def send_announce(db_session: AsyncSession, ap_object_id: str) -> None:
raw_object = await ap.fetch(ap.get_id(ap_object_id)) raw_object = await ap.fetch(ap.get_id(ap_object_id))
await save_object_to_inbox(db_session, raw_object) await save_object_to_inbox(db_session, raw_object)
await db_session.commit() await db_session.commit()
# XXX: we need to reload it as lazy-loading the actor will fail
# (asyncio SQLAlchemy issue)
inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id) inbox_object = await get_inbox_object_by_ap_id(db_session, ap_object_id)
if not inbox_object: if not inbox_object:
raise ValueError("Should never happen") raise ValueError("Should never happen")
@ -1615,6 +1619,9 @@ async def save_to_inbox(
inbox_object_id=inbox_object.id, inbox_object_id=inbox_object.id,
) )
db_session.add(notif) db_session.add(notif)
if activity_ro.ap_type == "Accept":
# Pre-fetch the latest activities
await _prefetch_actor_outbox(db_session, actor)
else: else:
logger.info( logger.info(
"Received an Accept for an unsupported activity: " "Received an Accept for an unsupported activity: "
@ -1750,10 +1757,41 @@ async def save_to_inbox(
await db_session.commit() await db_session.commit()
async def _prefetch_actor_outbox(
db_session: AsyncSession,
actor: models.Actor,
) -> None:
"""Try to fetch some notes to fill the stream"""
saved = 0
outbox = await ap.parse_collection(actor.outbox_url, limit=20)
for activity in outbox:
activity_id = ap.get_id(activity)
raw_activity = await ap.fetch(activity_id)
if ap.as_list(raw_activity["type"])[0] == "Create":
obj = await ap.get_object(raw_activity)
saved_inbox_object = await get_inbox_object_by_ap_id(
db_session, ap.get_id(obj)
)
if not saved_inbox_object:
saved_inbox_object = await save_object_to_inbox(db_session, obj)
if not saved_inbox_object.in_reply_to:
saved_inbox_object.is_hidden_from_stream = False
saved += 1
if saved >= 5:
break
# commit is performed by the called
async def save_object_to_inbox( async def save_object_to_inbox(
db_session: AsyncSession, db_session: AsyncSession,
raw_object: ap.RawObject, raw_object: ap.RawObject,
) -> models.InboxObject: ) -> models.InboxObject:
"""Used to save unknown object before intetacting with them, i.e. to like
an object that was looked up, or prefill the inbox when an actor accepted
a follow request."""
obj_actor = await fetch_actor(db_session, ap.get_actor_id(raw_object)) obj_actor = await fetch_actor(db_session, ap.get_actor_id(raw_object))
ro = RemoteObject(raw_object, actor=obj_actor) ro = RemoteObject(raw_object, actor=obj_actor)

View file

@ -7,6 +7,7 @@ import fastapi
import httpx import httpx
import respx import respx
from app import activitypub as ap
from app import actor from app import actor
from app import httpsig from app import httpsig
from app import models from app import models
@ -45,6 +46,18 @@ def setup_remote_actor(respx_mock: respx.MockRouter) -> actor.RemoteActor:
username="toto", username="toto",
public_key="pk", public_key="pk",
) )
respx_mock.get(ra.ap_id + "/outbox").mock(
return_value=httpx.Response(
200,
json={
"@context": ap.AS_EXTENDED_CTX,
"id": f"{ra.ap_id}/outbox",
"type": "OrderedCollection",
"totalItems": 0,
"orderedItems": [],
},
)
)
respx_mock.get(ra.ap_id).mock(return_value=httpx.Response(200, json=ra.ap_actor)) respx_mock.get(ra.ap_id).mock(return_value=httpx.Response(200, json=ra.ap_actor))
return ra return ra