mirror of
https://git.sr.ht/~tsileo/microblog.pub
synced 2024-11-14 18:54:27 +00:00
Improve Webfinger
This commit is contained in:
parent
e378ec94e0
commit
3097dbebe9
1 changed files with 74 additions and 14 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
@ -8,33 +9,85 @@ from app import config
|
||||||
from app.utils.url import check_url
|
from app.utils.url import check_url
|
||||||
|
|
||||||
|
|
||||||
|
async def get_webfinger_via_host_meta(host: str) -> str | None:
|
||||||
|
resp: httpx.Response | None = None
|
||||||
|
is_404 = False
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
for i, proto in enumerate({"http", "https"}):
|
||||||
|
try:
|
||||||
|
url = f"{proto}://{host}/.well-known/host-meta"
|
||||||
|
check_url(url)
|
||||||
|
resp = await client.get(
|
||||||
|
url,
|
||||||
|
headers={
|
||||||
|
"User-Agent": config.USER_AGENT,
|
||||||
|
},
|
||||||
|
follow_redirects=True,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
break
|
||||||
|
except httpx.HTTPStatusError as http_error:
|
||||||
|
logger.exception("HTTP error")
|
||||||
|
if http_error.response.status_code in [403, 404, 410]:
|
||||||
|
is_404 = True
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
except httpx.HTTPError:
|
||||||
|
logger.exception("req failed")
|
||||||
|
# If we tried https first and the domain is "http only"
|
||||||
|
if i == 0:
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
if is_404:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if resp:
|
||||||
|
tree = ET.fromstring(resp.text)
|
||||||
|
maybe_link = tree.find(
|
||||||
|
"./{http://docs.oasis-open.org/ns/xri/xrd-1.0}Link[@rel='lrdd']"
|
||||||
|
)
|
||||||
|
if maybe_link is not None:
|
||||||
|
return maybe_link.attrib.get("template")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def webfinger(
|
async def webfinger(
|
||||||
resource: str,
|
resource: str,
|
||||||
|
webfinger_url: str | None = None,
|
||||||
) -> dict[str, Any] | None: # noqa: C901
|
) -> dict[str, Any] | None: # noqa: C901
|
||||||
"""Mastodon-like WebFinger resolution to retrieve the activity stream Actor URL."""
|
"""Mastodon-like WebFinger resolution to retrieve the activity stream Actor URL."""
|
||||||
resource = resource.strip()
|
resource = resource.strip()
|
||||||
logger.info(f"performing webfinger resolution for {resource}")
|
logger.info(f"performing webfinger resolution for {resource}")
|
||||||
protos = ["https", "http"]
|
urls = []
|
||||||
if resource.startswith("http://"):
|
host = None
|
||||||
protos.reverse()
|
if webfinger_url:
|
||||||
host = urlparse(resource).netloc
|
urls = [webfinger_url]
|
||||||
elif resource.startswith("https://"):
|
|
||||||
host = urlparse(resource).netloc
|
|
||||||
else:
|
else:
|
||||||
if resource.startswith("acct:"):
|
if resource.startswith("http://"):
|
||||||
resource = resource[5:]
|
host = urlparse(resource).netloc
|
||||||
if resource.startswith("@"):
|
url = f"http://{host}/.well-known/webfinger"
|
||||||
resource = resource[1:]
|
elif resource.startswith("https://"):
|
||||||
_, host = resource.split("@", 1)
|
host = urlparse(resource).netloc
|
||||||
resource = "acct:" + resource
|
url = f"https://{host}/.well-known/webfinger"
|
||||||
|
else:
|
||||||
|
protos = ["https", "http"]
|
||||||
|
_, host = resource.split("@", 1)
|
||||||
|
urls = [f"{proto}://{host}/.well-known/webfinger" for proto in protos]
|
||||||
|
|
||||||
|
if resource.startswith("acct:"):
|
||||||
|
resource = resource[5:]
|
||||||
|
if resource.startswith("@"):
|
||||||
|
resource = resource[1:]
|
||||||
|
resource = "acct:" + resource
|
||||||
|
|
||||||
is_404 = False
|
is_404 = False
|
||||||
|
|
||||||
resp: httpx.Response | None = None
|
resp: httpx.Response | None = None
|
||||||
async with httpx.AsyncClient() as client:
|
async with httpx.AsyncClient() as client:
|
||||||
for i, proto in enumerate(protos):
|
for i, url in enumerate(urls):
|
||||||
try:
|
try:
|
||||||
url = f"{proto}://{host}/.well-known/webfinger"
|
|
||||||
check_url(url)
|
check_url(url)
|
||||||
resp = await client.get(
|
resp = await client.get(
|
||||||
url,
|
url,
|
||||||
|
@ -58,7 +111,14 @@ async def webfinger(
|
||||||
if i == 0:
|
if i == 0:
|
||||||
continue
|
continue
|
||||||
break
|
break
|
||||||
|
|
||||||
if is_404:
|
if is_404:
|
||||||
|
if not webfinger_url and host:
|
||||||
|
if webfinger_url := (await get_webfinger_via_host_meta(host)):
|
||||||
|
return await webfinger(
|
||||||
|
resource,
|
||||||
|
webfinger_url=webfinger_url,
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if resp:
|
if resp:
|
||||||
|
|
Loading…
Reference in a new issue