mirror of
https://git.sr.ht/~tsileo/microblog.pub
synced 2024-12-21 20:54:27 +00:00
Sign media URLs to avoid becoming an open proxy
Signatures are valid for ~1 week.
This commit is contained in:
parent
540b9d1470
commit
a4cfd65009
4 changed files with 41 additions and 9 deletions
|
@ -1,4 +1,5 @@
|
|||
import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import secrets
|
||||
from pathlib import Path
|
||||
|
@ -250,3 +251,7 @@ def verify_csrf_token(
|
|||
detail=f"The security token has expired, {please_try_again}",
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def hmac_sha256():
|
||||
return hmac.new(CONFIG.secret.encode(), digestmod=hashlib.sha256)
|
||||
|
|
11
app/main.py
11
app/main.py
|
@ -48,6 +48,7 @@ from app import boxes
|
|||
from app import config
|
||||
from app import httpsig
|
||||
from app import indieauth
|
||||
from app import media
|
||||
from app import micropub
|
||||
from app import models
|
||||
from app import templates
|
||||
|
@ -1128,14 +1129,17 @@ def _add_cache_control(headers: dict[str, str]) -> dict[str, str]:
|
|||
return {**headers, "Cache-Control": "max-age=31536000"}
|
||||
|
||||
|
||||
@app.get("/proxy/media/{encoded_url}")
|
||||
@app.get("/proxy/media/{exp}/{sig}/{encoded_url}")
|
||||
async def serve_proxy_media(
|
||||
request: Request,
|
||||
exp: int,
|
||||
sig: str,
|
||||
encoded_url: str,
|
||||
) -> StreamingResponse | PlainTextResponse:
|
||||
# Decode the base64-encoded URL
|
||||
url = base64.urlsafe_b64decode(encoded_url).decode()
|
||||
check_url(url)
|
||||
media.verify_proxied_media_sig(exp, url, sig)
|
||||
|
||||
proxy_resp = await _proxy_get(request, url, stream=True)
|
||||
|
||||
|
@ -1168,9 +1172,11 @@ async def serve_proxy_media(
|
|||
)
|
||||
|
||||
|
||||
@app.get("/proxy/media/{encoded_url}/{size}")
|
||||
@app.get("/proxy/media/{exp}/{sig}/{encoded_url}/{size}")
|
||||
async def serve_proxy_media_resized(
|
||||
request: Request,
|
||||
exp: int,
|
||||
sig: str,
|
||||
encoded_url: str,
|
||||
size: int,
|
||||
) -> PlainTextResponse:
|
||||
|
@ -1180,6 +1186,7 @@ async def serve_proxy_media_resized(
|
|||
# Decode the base64-encoded URL
|
||||
url = base64.urlsafe_b64decode(encoded_url).decode()
|
||||
check_url(url)
|
||||
media.verify_proxied_media_sig(exp, url, sig)
|
||||
|
||||
if cached_resp := _RESIZED_CACHE.get((url, size)):
|
||||
resized_content, resized_mimetype, resp_headers = cached_resp
|
||||
|
|
27
app/media.py
27
app/media.py
|
@ -1,15 +1,40 @@
|
|||
import base64
|
||||
import time
|
||||
|
||||
from app.config import BASE_URL
|
||||
from app.config import hmac_sha256
|
||||
|
||||
SUPPORTED_RESIZE = [50, 740]
|
||||
EXPIRY_PERIOD = 86400
|
||||
EXPIRY_LENGTH = 7
|
||||
|
||||
|
||||
class InvalidProxySignatureError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def proxied_media_sig(expires: int, url: str) -> str:
|
||||
hm = hmac_sha256()
|
||||
hm.update(f'{expires}'.encode())
|
||||
hm.update(b'|')
|
||||
hm.update(url.encode())
|
||||
return base64.urlsafe_b64encode(hm.digest()).decode()
|
||||
|
||||
|
||||
def verify_proxied_media_sig(expires: int, url: str, sig: str) -> None:
|
||||
now = int(time.time() / EXPIRY_PERIOD)
|
||||
expected = proxied_media_sig(expires, url)
|
||||
if now > expires or sig != expected:
|
||||
raise InvalidProxySignatureError("invalid or expired media")
|
||||
|
||||
|
||||
def proxied_media_url(url: str) -> str:
|
||||
if url.startswith(BASE_URL):
|
||||
return url
|
||||
expires = int(time.time() / EXPIRY_PERIOD) + EXPIRY_LENGTH
|
||||
sig = proxied_media_sig(expires, url)
|
||||
|
||||
return BASE_URL + "/proxy/media/" + base64.urlsafe_b64encode(url.encode()).decode()
|
||||
return BASE_URL + f"/proxy/media/{expires}/{sig}/" + base64.urlsafe_b64encode(url.encode()).decode()
|
||||
|
||||
|
||||
def resized_media_url(url: str, size: int) -> str:
|
||||
|
|
|
@ -60,12 +60,7 @@ def _filter_domain(text: str) -> str:
|
|||
def _media_proxy_url(url: str | None) -> str:
|
||||
if not url:
|
||||
return BASE_URL + "/static/nopic.png"
|
||||
|
||||
if url.startswith(BASE_URL):
|
||||
return url
|
||||
|
||||
encoded_url = base64.urlsafe_b64encode(url.encode()).decode()
|
||||
return BASE_URL + f"/proxy/media/{encoded_url}"
|
||||
return proxied_media_url(url)
|
||||
|
||||
|
||||
def is_current_user_admin(request: Request) -> bool:
|
||||
|
|
Loading…
Reference in a new issue