microblog.pub/app/templates.py

421 lines
9.5 KiB
Python
Raw Normal View History

2022-06-22 18:11:22 +00:00
import base64
from datetime import datetime
from datetime import timezone
from functools import lru_cache
from typing import Any
2022-07-04 17:43:37 +00:00
from typing import Callable
2022-06-22 18:11:22 +00:00
from urllib.parse import urlparse
import bleach
2022-06-27 18:55:44 +00:00
import emoji
2022-06-26 19:54:07 +00:00
import html2text
2022-07-08 10:10:20 +00:00
import humanize
2022-06-22 18:11:22 +00:00
from bs4 import BeautifulSoup # type: ignore
from dateutil.parser import parse
2022-06-22 18:11:22 +00:00
from fastapi import Request
from fastapi.templating import Jinja2Templates
2022-06-26 08:01:26 +00:00
from loguru import logger
2022-06-29 06:56:39 +00:00
from sqlalchemy import func
from sqlalchemy import select
2022-06-22 18:11:22 +00:00
from starlette.templating import _TemplateResponse as TemplateResponse
from app import activitypub as ap
2022-06-27 18:55:44 +00:00
from app import config
2022-06-22 18:11:22 +00:00
from app import models
from app.actor import LOCAL_ACTOR
from app.ap_object import Attachment
2022-06-26 08:01:26 +00:00
from app.ap_object import Object
2022-06-23 19:07:20 +00:00
from app.config import BASE_URL
2022-08-13 13:35:39 +00:00
from app.config import CSS_HASH
2022-08-24 19:18:30 +00:00
from app.config import CUSTOM_FOOTER
2022-06-22 18:11:22 +00:00
from app.config import DEBUG
from app.config import VERSION
from app.config import generate_csrf_token
from app.config import session_serializer
2022-06-29 18:43:17 +00:00
from app.database import AsyncSession
2022-06-26 08:01:26 +00:00
from app.media import proxied_media_url
2022-08-04 17:11:14 +00:00
from app.utils import privacy_replace
from app.utils.datetime import now
2022-06-22 19:15:07 +00:00
from app.utils.highlight import HIGHLIGHT_CSS
from app.utils.highlight import highlight
2022-06-22 18:11:22 +00:00
2022-07-16 06:07:58 +00:00
_templates = Jinja2Templates(
directory="app/templates",
trim_blocks=True,
lstrip_blocks=True,
)
2022-06-22 18:11:22 +00:00
2022-06-26 19:54:07 +00:00
H2T = html2text.HTML2Text()
H2T.ignore_links = True
H2T.ignore_images = True
2022-06-22 18:11:22 +00:00
def _filter_domain(text: str) -> str:
hostname = urlparse(text).hostname
if not hostname:
raise ValueError(f"No hostname for {text}")
return hostname
def _media_proxy_url(url: str | None) -> str:
if not url:
return "/static/nopic.png"
2022-06-23 19:07:20 +00:00
if url.startswith(BASE_URL):
2022-06-22 18:11:22 +00:00
return url
encoded_url = base64.urlsafe_b64encode(url.encode()).decode()
return f"/proxy/media/{encoded_url}"
def is_current_user_admin(request: Request) -> bool:
is_admin = False
session_cookie = request.cookies.get("session")
if session_cookie:
try:
loaded_session = session_serializer.loads(
session_cookie,
max_age=3600 * 12,
)
except Exception:
pass
else:
is_admin = loaded_session.get("is_logged_in")
return is_admin
2022-06-29 18:43:17 +00:00
async def render_template(
db_session: AsyncSession,
2022-06-22 18:11:22 +00:00
request: Request,
template: str,
2022-08-06 07:32:22 +00:00
template_args: dict[str, Any] | None = None,
2022-08-22 16:50:20 +00:00
status_code: int = 200,
2022-06-22 18:11:22 +00:00
) -> TemplateResponse:
2022-08-06 07:32:22 +00:00
if template_args is None:
template_args = {}
2022-06-22 18:11:22 +00:00
is_admin = False
is_admin = is_current_user_admin(request)
return _templates.TemplateResponse(
template,
{
"request": request,
"debug": DEBUG,
"microblogpub_version": VERSION,
2022-08-13 13:35:39 +00:00
"css_hash": CSS_HASH,
2022-06-22 18:11:22 +00:00
"is_admin": is_admin,
2022-08-08 20:12:24 +00:00
"csrf_token": generate_csrf_token(),
2022-06-22 18:11:22 +00:00
"highlight_css": HIGHLIGHT_CSS,
"visibility_enum": ap.VisibilityEnum,
2022-06-29 18:43:17 +00:00
"notifications_count": await db_session.scalar(
2022-06-29 06:56:39 +00:00
select(func.count(models.Notification.id)).where(
models.Notification.is_new.is_(True)
)
)
2022-06-22 18:11:22 +00:00
if is_admin
else 0,
2022-07-25 20:51:53 +00:00
"articles_count": await db_session.scalar(
select(func.count(models.OutboxObject.id)).where(
models.OutboxObject.visibility == ap.VisibilityEnum.PUBLIC,
models.OutboxObject.is_deleted.is_(False),
models.OutboxObject.is_hidden_from_homepage.is_(False),
models.OutboxObject.ap_type == "Article",
)
),
2022-06-22 18:11:22 +00:00
"local_actor": LOCAL_ACTOR,
2022-06-29 18:43:17 +00:00
"followers_count": await db_session.scalar(
select(func.count(models.Follower.id))
),
"following_count": await db_session.scalar(
select(func.count(models.Following.id))
),
2022-07-09 17:26:18 +00:00
"actor_types": ap.ACTOR_TYPES,
2022-08-24 19:18:30 +00:00
"custom_footer": CUSTOM_FOOTER,
2022-06-22 18:11:22 +00:00
**template_args,
},
2022-08-22 16:50:20 +00:00
status_code=status_code,
2022-06-22 18:11:22 +00:00
)
# HTML/templates helper
ALLOWED_TAGS = [
"a",
"abbr",
"acronym",
"b",
"br",
"blockquote",
"code",
"pre",
"em",
"i",
"li",
"ol",
"strong",
"sup",
"sub",
"del",
"ul",
"span",
"div",
"p",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"table",
"th",
"tr",
"td",
"thead",
"tbody",
"tfoot",
"colgroup",
"caption",
"img",
2022-07-04 17:43:37 +00:00
"div",
"span",
]
ALLOWED_CSS_CLASSES = [
# microformats
"h-card",
"u-url",
"mention",
# code highlighting
2022-07-04 17:43:37 +00:00
"highlight",
"codehilite",
"hll",
"c",
"err",
"g",
"k",
"l",
"n",
"o",
"x",
"p",
"ch",
"cm",
"cp",
"cpf",
"c1",
"cs",
"gd",
"ge",
"gr",
"gh",
"gi",
"go",
"gp",
"gs",
"gu",
"gt",
"kc",
"kd",
"kn",
"kp",
"kr",
"kt",
"ld",
"m",
"s",
"na",
"nb",
"nc",
"no",
"nd",
"ni",
"ne",
"nf",
"nl",
"nn",
"nx",
"py",
"nt",
"nv",
"ow",
"w",
"mb",
"mf",
"mh",
"mi",
"mo",
"sa",
"sb",
"sc",
"dl",
"sd",
"s2",
"se",
"sh",
"si",
"sx",
"sr",
"s1",
"ss",
"bp",
"fm",
"vc",
"vg",
"vi",
"vm",
"il",
2022-06-22 18:11:22 +00:00
]
2022-07-04 17:43:37 +00:00
def _allow_class(_tag: str, name: str, value: str) -> bool:
return name == "class" and value in ALLOWED_CSS_CLASSES
2022-07-14 13:16:58 +00:00
def _allow_img_attrs(_tag: str, name: str, value: str) -> bool:
if name in ["src", "alt", "title"]:
return True
if name == "class" and value == "inline-img":
return True
return False
2022-07-04 17:43:37 +00:00
ALLOWED_ATTRIBUTES: dict[str, list[str] | Callable[[str, str, str], bool]] = {
2022-06-22 18:11:22 +00:00
"a": ["href", "title"],
"abbr": ["title"],
"acronym": ["title"],
2022-07-14 13:16:58 +00:00
"img": _allow_img_attrs,
2022-07-04 17:43:37 +00:00
"div": _allow_class,
"span": _allow_class,
"code": _allow_class,
2022-06-22 18:11:22 +00:00
}
@lru_cache(maxsize=256)
def _update_inline_imgs(content):
soup = BeautifulSoup(content, "html5lib")
imgs = soup.find_all("img")
if not imgs:
return content
for img in imgs:
if not img.attrs.get("src"):
continue
2022-07-14 13:16:58 +00:00
img.attrs["src"] = _media_proxy_url(img.attrs["src"]) + "/740"
img["class"] = "inline-img"
2022-06-22 18:11:22 +00:00
return soup.find("body").decode_contents()
2022-06-26 08:01:26 +00:00
def _clean_html(html: str, note: Object) -> str:
2022-07-11 07:33:56 +00:00
if html is None:
logger.error(f"{html=} for {note.ap_id}/{note.ap_object}")
return ""
2022-06-22 18:11:22 +00:00
try:
2022-06-27 18:55:44 +00:00
return _emojify(
_replace_custom_emojis(
bleach.clean(
2022-08-09 21:09:37 +00:00
privacy_replace.replace_content(
_update_inline_imgs(highlight(html))
),
2022-06-27 18:55:44 +00:00
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRIBUTES,
strip=True,
),
note,
2022-06-30 07:25:13 +00:00
),
is_local=note.ap_id.startswith(BASE_URL),
2022-06-22 18:11:22 +00:00
)
except Exception:
raise
def _timeago(original_dt: datetime) -> str:
dt = original_dt
if dt.tzinfo:
dt = dt.astimezone(timezone.utc).replace(tzinfo=None)
2022-07-08 10:10:20 +00:00
return humanize.naturaltime(dt, when=now().replace(tzinfo=None))
2022-06-22 18:11:22 +00:00
def _has_media_type(attachment: Attachment, media_type_prefix: str) -> bool:
2022-07-20 18:59:29 +00:00
if attachment.media_type:
return attachment.media_type.startswith(media_type_prefix)
return False
2022-06-22 18:11:22 +00:00
2022-06-25 08:20:07 +00:00
def _format_date(dt: datetime) -> str:
return dt.strftime("%b %d, %Y, %H:%M")
def _pluralize(count: int, singular: str = "", plural: str = "s") -> str:
if count > 1:
return plural
else:
return singular
2022-06-26 08:01:26 +00:00
def _replace_custom_emojis(content: str, note: Object) -> str:
idx = {}
for tag in note.tags:
2022-06-26 08:01:26 +00:00
if tag.get("type") == "Emoji":
try:
idx[tag["name"]] = proxied_media_url(tag["icon"]["url"])
except KeyError:
logger.warning(f"Failed to parse custom emoji {tag=}")
continue
for emoji_name, emoji_url in idx.items():
content = content.replace(
emoji_name,
f'<img class="custom-emoji" src="{emoji_url}" title="{emoji_name}" alt="{emoji_name}">', # noqa: E501
)
return content
2022-06-26 19:54:07 +00:00
def _html2text(content: str) -> str:
return H2T.handle(content)
2022-06-30 07:25:13 +00:00
def _replace_emoji(u: str, _) -> str:
2022-06-27 18:55:44 +00:00
filename = hex(ord(u))[2:]
return config.EMOJI_TPL.format(filename=filename, raw=u)
2022-06-30 07:25:13 +00:00
def _emojify(text: str, is_local: bool) -> str:
if not is_local:
return text
2022-06-27 18:55:44 +00:00
return emoji.replace_emoji(
text,
replace=_replace_emoji,
)
def _parse_datetime(dt: str) -> datetime:
return parse(dt)
def _poll_item_pct(item: ap.RawObject, voters_count: int) -> int:
if voters_count == 0:
return 0
return int(item["replies"]["totalItems"] * 100 / voters_count)
2022-06-22 18:11:22 +00:00
_templates.env.filters["domain"] = _filter_domain
_templates.env.filters["media_proxy_url"] = _media_proxy_url
_templates.env.filters["clean_html"] = _clean_html
_templates.env.filters["timeago"] = _timeago
2022-06-25 08:20:07 +00:00
_templates.env.filters["format_date"] = _format_date
2022-06-22 18:11:22 +00:00
_templates.env.filters["has_media_type"] = _has_media_type
2022-06-26 19:54:07 +00:00
_templates.env.filters["html2text"] = _html2text
2022-06-27 18:55:44 +00:00
_templates.env.filters["emojify"] = _emojify
_templates.env.filters["pluralize"] = _pluralize
_templates.env.filters["parse_datetime"] = _parse_datetime
_templates.env.filters["poll_item_pct"] = _poll_item_pct
2022-08-04 17:11:14 +00:00
_templates.env.filters["privacy_replace_url"] = privacy_replace.replace_url