microblog.pub/utils/opengraph.py

122 lines
3.6 KiB
Python
Raw Normal View History

2018-08-05 12:24:52 +00:00
import logging
2019-08-08 21:48:30 +00:00
import mimetypes
2019-08-05 20:40:24 +00:00
from typing import Any
from typing import Dict
from typing import Set
2019-08-04 21:36:38 +00:00
from urllib.parse import urlparse
2019-04-22 07:58:11 +00:00
2018-05-18 18:41:41 +00:00
import opengraph
import requests
from bs4 import BeautifulSoup
from little_boxes import activitypub as ap
from little_boxes.errors import NotAnActivityError
2018-06-17 17:21:59 +00:00
from little_boxes.urlutils import check_url
from little_boxes.urlutils import is_url_valid
2018-07-22 10:04:18 +00:00
from .lookup import lookup
2018-05-18 18:41:41 +00:00
2018-08-05 12:24:52 +00:00
logger = logging.getLogger(__name__)
2018-05-18 18:41:41 +00:00
2019-08-05 20:40:24 +00:00
def links_from_note(note: Dict[str, Any]) -> Set[str]:
2019-08-04 21:36:38 +00:00
note_host = urlparse(ap._get_id(note["id"]) or "").netloc
2018-05-18 18:41:41 +00:00
links = set()
2019-04-14 17:17:54 +00:00
if "content" in note:
soup = BeautifulSoup(note["content"], "html5lib")
2019-04-14 17:17:54 +00:00
for link in soup.find_all("a"):
h = link.get("href")
2019-08-04 21:36:38 +00:00
ph = urlparse(h)
if (
ph.scheme in {"http", "https"}
and ph.netloc != note_host
and is_url_valid(h)
):
2019-04-14 17:17:54 +00:00
links.add(h)
# FIXME(tsileo): support summary and name fields
2018-05-18 18:41:41 +00:00
return links
2018-07-21 21:16:40 +00:00
def fetch_og_metadata(user_agent, links):
2018-08-05 12:24:52 +00:00
res = []
2018-05-18 18:41:41 +00:00
for l in links:
2019-08-08 21:48:30 +00:00
# Try to skip media early
mimetype, _ = mimetypes.guess_type(l)
if mimetype and mimetype.split("/")[0] in ["image", "video", "audio"]:
logger.info(f"skipping media link {l}")
continue
2018-05-25 22:03:30 +00:00
check_url(l)
2019-08-06 18:16:06 +00:00
# Remove any AP objects
try:
2019-08-06 18:16:06 +00:00
lookup(l)
continue
except NotAnActivityError:
pass
2019-08-06 18:16:06 +00:00
except Exception:
logger.exception(f"skipping {l} because of issues during AP lookup")
continue
2019-08-06 18:16:06 +00:00
try:
2019-08-08 21:19:04 +00:00
h = requests.head(
l, headers={"User-Agent": user_agent}, timeout=3, allow_redirects=True
)
2019-08-06 18:16:06 +00:00
h.raise_for_status()
except requests.HTTPError as http_err:
2019-08-06 19:51:58 +00:00
logger.debug(
f"failed to HEAD {l}, got a {http_err.response.status_code}: {http_err.response.text}"
)
2019-08-06 18:16:06 +00:00
continue
2019-08-06 19:51:58 +00:00
except requests.RequestException as err:
logger.debug(f"failed to HEAD {l}: {err!r}")
2019-08-06 19:47:02 +00:00
continue
2019-08-06 18:16:06 +00:00
if not h.headers.get("content-type").startswith("text/html"):
logger.debug(f"skipping {l} for bad content type")
continue
try:
2019-08-08 21:19:04 +00:00
r = requests.get(
l, headers={"User-Agent": user_agent}, timeout=5, allow_redirects=True
)
2019-08-06 18:16:06 +00:00
r.raise_for_status()
except requests.HTTPError as http_err:
2019-08-06 19:51:58 +00:00
logger.debug(
f"failed to GET {l}, got a {http_err.response.status_code}: {http_err.response.text}"
)
2018-08-05 12:45:44 +00:00
continue
2019-08-06 19:51:58 +00:00
except requests.RequestException as err:
logger.debug(f"failed to GET {l}: {err!r}")
2019-08-06 19:47:02 +00:00
continue
2018-08-05 11:55:48 +00:00
2019-04-13 08:00:56 +00:00
r.encoding = "UTF-8"
2018-08-05 12:24:52 +00:00
html = r.text
try:
data = dict(opengraph.OpenGraph(html=html))
except Exception:
2018-08-05 12:45:44 +00:00
logger.exception(f"failed to parse {l}")
2018-08-05 12:24:52 +00:00
continue
2019-07-05 08:42:04 +00:00
# Keep track of the fetched URL as some crappy websites use relative URLs everywhere
data["_input_url"] = l
2019-08-04 21:36:38 +00:00
u = urlparse(l)
2019-07-05 08:42:04 +00:00
# If it's a relative URL, build the absolute version
if "image" in data and data["image"].startswith("/"):
data["image"] = u._replace(
path=data["image"], params="", query="", fragment=""
).geturl()
if "url" in data and data["url"].startswith("/"):
data["url"] = u._replace(
path=data["url"], params="", query="", fragment=""
).geturl()
2018-08-05 11:55:48 +00:00
if data.get("url"):
res.append(data)
return res