From d371e3cd4fcc8832045dee5ce968095589629cf7 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Thu, 30 Jun 2022 00:28:07 +0200 Subject: [PATCH] Make most of the HTTP requests async --- app/activitypub.py | 45 +++++++----------- app/actor.py | 2 +- app/ap_object.py | 23 ++++++--- app/boxes.py | 10 ++-- app/httpsig.py | 6 +-- app/lookup.py | 8 ++-- app/main.py | 2 +- app/source.py | 2 +- app/webfinger.py | 57 ++++++++++++----------- tests/test_inbox.py | 9 ++-- tests/test_outbox.py | 3 +- tests/test_process_outgoing_activities.py | 3 +- 12 files changed, 88 insertions(+), 82 deletions(-) diff --git a/app/activitypub.py b/app/activitypub.py index 37686cc..8d5b42d 100644 --- a/app/activitypub.py +++ b/app/activitypub.py @@ -103,16 +103,17 @@ class NotAnObjectError(Exception): self.resp = resp -def fetch(url: str, params: dict[str, Any] | None = None) -> dict[str, Any]: - resp = httpx.get( - url, - headers={ - "User-Agent": config.USER_AGENT, - "Accept": config.AP_CONTENT_TYPE, - }, - params=params, - follow_redirects=True, - ) +async def fetch(url: str, params: dict[str, Any] | None = None) -> dict[str, Any]: + async with httpx.AsyncClient() as client: + resp = await client.get( + url, + headers={ + "User-Agent": config.USER_AGENT, + "Accept": config.AP_CONTENT_TYPE, + }, + params=params, + follow_redirects=True, + ) # Special handling for deleted object if resp.status_code == 410: @@ -125,7 +126,7 @@ def fetch(url: str, params: dict[str, Any] | None = None) -> dict[str, Any]: raise NotAnObjectError(url, resp) -def parse_collection( # noqa: C901 +async def parse_collection( # noqa: C901 url: str | None = None, payload: RawObject | None = None, level: int = 0, @@ -137,7 +138,7 @@ def parse_collection( # noqa: C901 # Go through all the pages out: list[RawObject] = [] if url: - payload = fetch(url) + payload = await fetch(url) if not payload: raise ValueError("must at least prove a payload or an URL") @@ -155,7 +156,9 @@ def parse_collection( # noqa: C901 return payload["items"] if "first" in payload: if isinstance(payload["first"], str): - out.extend(parse_collection(url=payload["first"], level=level + 1)) + out.extend( + await parse_collection(url=payload["first"], level=level + 1) + ) else: if "orderedItems" in payload["first"]: out.extend(payload["first"]["orderedItems"]) @@ -163,7 +166,7 @@ def parse_collection( # noqa: C901 out.extend(payload["first"]["items"]) n = payload["first"].get("next") if n: - out.extend(parse_collection(url=n, level=level + 1)) + out.extend(await parse_collection(url=n, level=level + 1)) return out while payload: @@ -175,7 +178,7 @@ def parse_collection( # noqa: C901 n = payload.get("next") if n is None: break - payload = fetch(n) + payload = await fetch(n) else: raise ValueError("unexpected activity type {}".format(payload["type"])) @@ -263,18 +266,6 @@ def remove_context(raw_object: RawObject) -> RawObject: return a -def get(url: str, params: dict[str, Any] | None = None) -> dict[str, Any]: - resp = httpx.get( - url, - headers={"User-Agent": config.USER_AGENT, "Accept": config.AP_CONTENT_TYPE}, - params=params, - follow_redirects=True, - auth=auth, - ) - resp.raise_for_status() - return resp.json() - - def post(url: str, payload: dict[str, Any]) -> httpx.Response: resp = httpx.post( url, diff --git a/app/actor.py b/app/actor.py index 69789b6..230bf1c 100644 --- a/app/actor.py +++ b/app/actor.py @@ -160,7 +160,7 @@ async def fetch_actor(db_session: AsyncSession, actor_id: str) -> "ActorModel": if existing_actor: return existing_actor - ap_actor = ap.get(actor_id) + ap_actor = await ap.fetch(actor_id) return await save_actor(db_session, ap_actor) diff --git a/app/ap_object.py b/app/ap_object.py index 92b6920..db79274 100644 --- a/app/ap_object.py +++ b/app/ap_object.py @@ -178,26 +178,35 @@ class Attachment(BaseModel): class RemoteObject(Object): - def __init__(self, raw_object: ap.RawObject, actor: Actor | None = None): + def __init__(self, raw_object: ap.RawObject, actor: Actor): self._raw_object = raw_object - self._actor: Actor + self._actor = actor + if self._actor.ap_id != ap.get_actor_id(self._raw_object): + raise ValueError(f"Invalid actor {self._actor.ap_id}") + + @classmethod + async def from_raw_object( + cls, + raw_object: ap.RawObject, + actor: Actor | None = None, + ): # Pre-fetch the actor actor_id = ap.get_actor_id(raw_object) if actor_id == LOCAL_ACTOR.ap_id: - self._actor = LOCAL_ACTOR + _actor = LOCAL_ACTOR elif actor: if actor.ap_id != actor_id: raise ValueError( f"Invalid actor, got {actor.ap_id}, " f"expected {actor_id}" ) - self._actor = actor + _actor = actor # type: ignore else: - self._actor = RemoteActor( - ap_actor=ap.fetch(ap.get_actor_id(raw_object)), + _actor = RemoteActor( + ap_actor=await ap.fetch(ap.get_actor_id(raw_object)), ) - self._og_meta = None + return cls(raw_object, _actor) @property def og_meta(self) -> list[dict[str, Any]] | None: diff --git a/app/boxes.py b/app/boxes.py index bd11431..37cdb0f 100644 --- a/app/boxes.py +++ b/app/boxes.py @@ -52,7 +52,7 @@ async def save_outbox_object( relates_to_actor_id: int | None = None, source: str | None = None, ) -> models.OutboxObject: - ra = RemoteObject(raw_object) + ra = await RemoteObject.from_raw_object(raw_object) outbox_object = models.OutboxObject( public_id=public_id, @@ -368,13 +368,13 @@ async def _compute_recipients( continue # Fetch the object - raw_object = ap.fetch(r) + raw_object = await ap.fetch(r) if raw_object.get("type") in ap.ACTOR_TYPES: saved_actor = await save_actor(db_session, raw_object) recipients.add(saved_actor.shared_inbox_url or saved_actor.inbox_url) else: # Assume it's a collection of actors - for raw_actor in ap.parse_collection(payload=raw_object): + for raw_actor in await ap.parse_collection(payload=raw_object): actor = RemoteActor(raw_actor) recipients.add(actor.shared_inbox_url or actor.inbox_url) @@ -741,7 +741,7 @@ async def save_to_inbox(db_session: AsyncSession, raw_object: ap.RawObject) -> N # Save it as an inbox object if not ra.activity_object_ap_id: raise ValueError("Should never happen") - announced_raw_object = ap.fetch(ra.activity_object_ap_id) + announced_raw_object = await ap.fetch(ra.activity_object_ap_id) announced_actor = await fetch_actor( db_session, ap.get_actor_id(announced_raw_object) ) @@ -830,7 +830,7 @@ async def fetch_actor_collection(db_session: AsyncSession, url: str) -> list[Act else: raise ValueError(f"internal collection for {url}) not supported") - return [RemoteActor(actor) for actor in ap.parse_collection(url)] + return [RemoteActor(actor) for actor in await ap.parse_collection(url)] @dataclass diff --git a/app/httpsig.py b/app/httpsig.py index c77aaa7..270ddfb 100644 --- a/app/httpsig.py +++ b/app/httpsig.py @@ -63,11 +63,11 @@ def _body_digest(body: bytes) -> str: @lru_cache(32) -def _get_public_key(key_id: str) -> Key: +async def _get_public_key(key_id: str) -> Key: # TODO: use DB to use cache actor from app import activitypub as ap - actor = ap.fetch(key_id) + actor = await ap.fetch(key_id) if actor["type"] == "Key": # The Key is not embedded in the Person k = Key(actor["owner"], actor["id"]) @@ -111,7 +111,7 @@ async def httpsig_checker( ) try: - k = _get_public_key(hsig["keyId"]) + k = await _get_public_key(hsig["keyId"]) except ap.ObjectIsGoneError: logger.info("Actor is gone") return HTTPSigInfo(has_valid_signature=False) diff --git a/app/lookup.py b/app/lookup.py index 0480e90..daa9ad7 100644 --- a/app/lookup.py +++ b/app/lookup.py @@ -10,13 +10,13 @@ from app.database import AsyncSession async def lookup(db_session: AsyncSession, query: str) -> Actor | RemoteObject: if query.startswith("@"): - query = webfinger.get_actor_url(query) # type: ignore # None check below + query = await webfinger.get_actor_url(query) # type: ignore # None check below if not query: raise ap.NotAnObjectError(query) try: - ap_obj = ap.fetch(query) + ap_obj = await ap.fetch(query) except ap.NotAnObjectError as not_an_object_error: resp = not_an_object_error.resp if not resp: @@ -26,7 +26,7 @@ async def lookup(db_session: AsyncSession, query: str) -> Actor | RemoteObject: if resp.headers.get("content-type", "").startswith("text/html"): for alternate in mf2py.parse(doc=resp.text).get("alternates", []): if alternate.get("type") == "application/activity+json": - alternate_obj = ap.fetch(alternate["url"]) + alternate_obj = await ap.fetch(alternate["url"]) if alternate_obj: ap_obj = alternate_obj @@ -37,4 +37,4 @@ async def lookup(db_session: AsyncSession, query: str) -> Actor | RemoteObject: actor = await fetch_actor(db_session, ap_obj["id"]) return actor else: - return RemoteObject(ap_obj) + return await RemoteObject.from_raw_object(ap_obj) diff --git a/app/main.py b/app/main.py index bee2c55..cc58571 100644 --- a/app/main.py +++ b/app/main.py @@ -604,7 +604,7 @@ async def post_remote_follow( if not profile.startswith("@"): profile = f"@{profile}" - remote_follow_template = get_remote_follow_template(profile) + remote_follow_template = await get_remote_follow_template(profile) if not remote_follow_template: raise HTTPException(status_code=404) diff --git a/app/source.py b/app/source.py index 1ae0a87..7ba4af5 100644 --- a/app/source.py +++ b/app/source.py @@ -52,7 +52,7 @@ async def _mentionify( ) ).scalar_one_or_none() if not actor: - actor_url = webfinger.get_actor_url(mention) + actor_url = await webfinger.get_actor_url(mention) if not actor_url: # FIXME(ts): raise an error? continue diff --git a/app/webfinger.py b/app/webfinger.py index 59b5199..addc122 100644 --- a/app/webfinger.py +++ b/app/webfinger.py @@ -7,7 +7,7 @@ from loguru import logger from app import config -def webfinger( +async def webfinger( resource: str, ) -> dict[str, Any] | None: # noqa: C901 """Mastodon-like WebFinger resolution to retrieve the activity stream Actor URL.""" @@ -28,37 +28,38 @@ def webfinger( is_404 = False - for i, proto in enumerate(protos): - try: - url = f"{proto}://{host}/.well-known/webfinger" - resp = httpx.get( - url, - params={"resource": resource}, - headers={ - "User-Agent": config.USER_AGENT, - }, - ) - break - except httpx.HTTPStatusError as http_error: - logger.exception("HTTP error") - if http_error.response.status_code in [403, 404, 410]: - is_404 = True - continue - raise - except httpx.HTTPError: - logger.exception("req failed") - # If we tried https first and the domain is "http only" - if i == 0: - continue - break + async with httpx.AsyncClient() as client: + for i, proto in enumerate(protos): + try: + url = f"{proto}://{host}/.well-known/webfinger" + resp = await client.get( + url, + params={"resource": resource}, + headers={ + "User-Agent": config.USER_AGENT, + }, + ) + break + except httpx.HTTPStatusError as http_error: + logger.exception("HTTP error") + if http_error.response.status_code in [403, 404, 410]: + is_404 = True + continue + raise + except httpx.HTTPError: + logger.exception("req failed") + # If we tried https first and the domain is "http only" + if i == 0: + continue + break if is_404: return None return resp.json() -def get_remote_follow_template(resource: str) -> str | None: - data = webfinger(resource) +async def get_remote_follow_template(resource: str) -> str | None: + data = await webfinger(resource) if data is None: return None for link in data["links"]: @@ -67,13 +68,13 @@ def get_remote_follow_template(resource: str) -> str | None: return None -def get_actor_url(resource: str) -> str | None: +async def get_actor_url(resource: str) -> str | None: """Mastodon-like WebFinger resolution to retrieve the activity stream Actor URL. Returns: the Actor URL or None if the resolution failed. """ - data = webfinger(resource) + data = await webfinger(resource) if data is None: return None for link in data["links"]: diff --git a/tests/test_inbox.py b/tests/test_inbox.py index 6f134fa..80bc8e7 100644 --- a/tests/test_inbox.py +++ b/tests/test_inbox.py @@ -43,7 +43,8 @@ def test_inbox_follow_request( factories.build_follow_activity( from_remote_actor=ra, for_remote_actor=LOCAL_ACTOR, - ) + ), + ra, ) with mock_httpsig_checker(ra): response = client.post( @@ -100,7 +101,8 @@ def test_inbox_accept_follow_request( from_remote_actor=LOCAL_ACTOR, for_remote_actor=ra, outbox_public_id=follow_id, - ) + ), + LOCAL_ACTOR, ) outbox_object = factories.OutboxObjectFactory.from_remote_object( follow_id, follow_from_outbox @@ -111,7 +113,8 @@ def test_inbox_accept_follow_request( factories.build_accept_activity( from_remote_actor=ra, for_remote_object=follow_from_outbox, - ) + ), + ra, ) with mock_httpsig_checker(ra): response = client.post( diff --git a/tests/test_outbox.py b/tests/test_outbox.py index 45bdda8..81ec561 100644 --- a/tests/test_outbox.py +++ b/tests/test_outbox.py @@ -112,7 +112,8 @@ def test_send_create_activity__with_followers( from_remote_actor=ra, for_remote_actor=LOCAL_ACTOR, outbox_public_id=follow_id, - ) + ), + ra, ) inbox_object = factories.InboxObjectFactory.from_remote_object( follow_from_inbox, actor diff --git a/tests/test_process_outgoing_activities.py b/tests/test_process_outgoing_activities.py index f013518..a65e6db 100644 --- a/tests/test_process_outgoing_activities.py +++ b/tests/test_process_outgoing_activities.py @@ -31,7 +31,8 @@ def _setup_outbox_object() -> models.OutboxObject: from_remote_actor=LOCAL_ACTOR, for_remote_actor=ra, outbox_public_id=follow_id, - ) + ), + LOCAL_ACTOR, ) outbox_object = factories.OutboxObjectFactory.from_remote_object( follow_id, follow_from_outbox