mirror of
https://git.sr.ht/~tsileo/microblog.pub
synced 2024-11-15 03:04:28 +00:00
Try poussetaches
This commit is contained in:
parent
5b38c5e723
commit
066309a0c8
7 changed files with 673 additions and 57 deletions
|
@ -16,6 +16,7 @@ install:
|
|||
- sudo chmod +x /usr/local/bin/docker-compose
|
||||
- docker-compose --version
|
||||
- pip install -r dev-requirements.txt
|
||||
- git clone https://github.com/tsileo/poussetaches.git && cd poussetaches && docker build . -t poussetaches:latest && cd -
|
||||
script:
|
||||
- mypy --ignore-missing-imports .
|
||||
- flake8 activitypub.py
|
||||
|
|
612
app.py
612
app.py
|
@ -16,6 +16,8 @@ from typing import Tuple
|
|||
from urllib.parse import urlencode
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from requests.exceptions import HTTPError
|
||||
import requests
|
||||
import bleach
|
||||
import mf2py
|
||||
import pymongo
|
||||
|
@ -41,7 +43,10 @@ from little_boxes.activitypub import _to_list
|
|||
from little_boxes.activitypub import clean_activity
|
||||
from little_boxes.activitypub import get_backend
|
||||
from little_boxes.content_helper import parse_markdown
|
||||
from little_boxes.linked_data_sig import generate_signature
|
||||
from little_boxes.errors import ActivityGoneError
|
||||
from little_boxes.errors import NotAnActivityError
|
||||
from little_boxes.errors import BadActivityError
|
||||
from little_boxes.errors import ActivityNotFoundError
|
||||
from little_boxes.errors import Error
|
||||
from little_boxes.errors import NotFromOutboxError
|
||||
|
@ -49,15 +54,18 @@ from little_boxes.httpsig import HTTPSigAuth
|
|||
from little_boxes.httpsig import verify_request
|
||||
from little_boxes.webfinger import get_actor_url
|
||||
from little_boxes.webfinger import get_remote_follow_template
|
||||
from utils import opengraph
|
||||
from passlib.hash import bcrypt
|
||||
from u2flib_server import u2f
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
import activitypub
|
||||
import config
|
||||
import tasks
|
||||
|
||||
# import tasks
|
||||
from activitypub import Box
|
||||
from activitypub import embed_collection
|
||||
from config import USER_AGENT
|
||||
from config import ADMIN_API_KEY
|
||||
from config import BASE_URL
|
||||
from config import DB
|
||||
|
@ -78,6 +86,11 @@ from utils.key import get_secret_key
|
|||
from utils.lookup import lookup
|
||||
from utils.media import Kind
|
||||
|
||||
from poussetaches import PousseTaches
|
||||
|
||||
p = PousseTaches("http://poussetaches:7991", "http://web:5005")
|
||||
|
||||
|
||||
back = activitypub.MicroblogPubBackend()
|
||||
ap.use_backend(back)
|
||||
|
||||
|
@ -191,7 +204,7 @@ ALLOWED_TAGS = [
|
|||
def clean_html(html):
|
||||
try:
|
||||
return bleach.clean(html, tags=ALLOWED_TAGS)
|
||||
except:
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
|
@ -631,7 +644,7 @@ def authorize_follow():
|
|||
return redirect("/following")
|
||||
|
||||
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
|
||||
tasks.post_to_outbox(follow)
|
||||
post_to_outbox(follow)
|
||||
|
||||
return redirect("/following")
|
||||
|
||||
|
@ -758,7 +771,7 @@ def tmp_migrate4():
|
|||
@login_required
|
||||
def tmp_migrate5():
|
||||
for activity in DB.activities.find():
|
||||
tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False)
|
||||
Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False)
|
||||
|
||||
return "Done"
|
||||
|
||||
|
@ -835,9 +848,10 @@ def _get_cached(type_="html", arg=None):
|
|||
cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg})
|
||||
if cached:
|
||||
app.logger.info("from cache")
|
||||
return cached['response_data']
|
||||
return cached["response_data"]
|
||||
return None
|
||||
|
||||
|
||||
def _cache(resp, type_="html", arg=None):
|
||||
if not CACHING:
|
||||
return None
|
||||
|
@ -855,7 +869,9 @@ def _cache(resp, type_="html", arg=None):
|
|||
def index():
|
||||
if is_api_request():
|
||||
return jsonify(**ME)
|
||||
cache_arg = f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}"
|
||||
cache_arg = (
|
||||
f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}"
|
||||
)
|
||||
cached = _get_cached("html", cache_arg)
|
||||
if cached:
|
||||
return cached
|
||||
|
@ -1197,7 +1213,7 @@ def outbox():
|
|||
data = request.get_json(force=True)
|
||||
print(data)
|
||||
activity = ap.parse_activity(data)
|
||||
activity_id = tasks.post_to_outbox(activity)
|
||||
activity_id = post_to_outbox(activity)
|
||||
|
||||
return Response(status=201, headers={"Location": activity_id})
|
||||
|
||||
|
@ -1536,11 +1552,15 @@ def _user_api_get_note(from_outbox: bool = False):
|
|||
oid = _user_api_arg("id")
|
||||
app.logger.info(f"fetching {oid}")
|
||||
try:
|
||||
note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.NOTE)
|
||||
except:
|
||||
note = ap.parse_activity(
|
||||
get_backend().fetch_iri(oid), expected=ActivityType.NOTE
|
||||
)
|
||||
except Exception:
|
||||
try:
|
||||
note = ap.parse_activity(get_backend().fetch_iri(oid), expected=ActivityType.VIDEO)
|
||||
except:
|
||||
note = ap.parse_activity(
|
||||
get_backend().fetch_iri(oid), expected=ActivityType.VIDEO
|
||||
)
|
||||
except Exception:
|
||||
raise ActivityNotFoundError(
|
||||
"Expected Note or Video ActivityType, but got something else"
|
||||
)
|
||||
|
@ -1570,7 +1590,7 @@ def api_delete():
|
|||
|
||||
delete = ap.Delete(actor=ID, object=ap.Tombstone(id=note.id).to_dict(embed=True))
|
||||
|
||||
delete_id = tasks.post_to_outbox(delete)
|
||||
delete_id = post_to_outbox(delete)
|
||||
|
||||
return _user_api_response(activity=delete_id)
|
||||
|
||||
|
@ -1581,7 +1601,7 @@ def api_boost():
|
|||
note = _user_api_get_note()
|
||||
|
||||
announce = note.build_announce(MY_PERSON)
|
||||
announce_id = tasks.post_to_outbox(announce)
|
||||
announce_id = post_to_outbox(announce)
|
||||
|
||||
return _user_api_response(activity=announce_id)
|
||||
|
||||
|
@ -1592,7 +1612,7 @@ def api_like():
|
|||
note = _user_api_get_note()
|
||||
|
||||
like = note.build_like(MY_PERSON)
|
||||
like_id = tasks.post_to_outbox(like)
|
||||
like_id = post_to_outbox(like)
|
||||
|
||||
return _user_api_response(activity=like_id)
|
||||
|
||||
|
@ -1639,7 +1659,7 @@ def api_undo():
|
|||
obj = ap.parse_activity(doc.get("activity"))
|
||||
# FIXME(tsileo): detect already undo-ed and make this API call idempotent
|
||||
undo = obj.build_undo()
|
||||
undo_id = tasks.post_to_outbox(undo)
|
||||
undo_id = post_to_outbox(undo)
|
||||
|
||||
return _user_api_response(activity=undo_id)
|
||||
|
||||
|
@ -1664,7 +1684,7 @@ def admin_stream():
|
|||
)
|
||||
|
||||
|
||||
@app.route("/inbox", methods=["GET", "POST"])
|
||||
@app.route("/inbox", methods=["GET", "POST"]) # noqa: C901
|
||||
def inbox():
|
||||
if request.method == "GET":
|
||||
if not is_api_request():
|
||||
|
@ -1733,7 +1753,7 @@ def inbox():
|
|||
)
|
||||
activity = ap.parse_activity(data)
|
||||
logger.debug(f"inbox activity={activity}/{data}")
|
||||
tasks.post_to_inbox(activity)
|
||||
post_to_inbox(activity)
|
||||
|
||||
return Response(status=201)
|
||||
|
||||
|
@ -1819,7 +1839,7 @@ def api_new_note():
|
|||
|
||||
note = ap.Note(**raw_note)
|
||||
create = note.build_create()
|
||||
create_id = tasks.post_to_outbox(create)
|
||||
create_id = post_to_outbox(create)
|
||||
|
||||
return _user_api_response(activity=create_id)
|
||||
|
||||
|
@ -1852,7 +1872,7 @@ def api_block():
|
|||
return _user_api_response(activity=existing["activity"]["id"])
|
||||
|
||||
block = ap.Block(actor=MY_PERSON.id, object=actor)
|
||||
block_id = tasks.post_to_outbox(block)
|
||||
block_id = post_to_outbox(block)
|
||||
|
||||
return _user_api_response(activity=block_id)
|
||||
|
||||
|
@ -1874,7 +1894,7 @@ def api_follow():
|
|||
return _user_api_response(activity=existing["activity"]["id"])
|
||||
|
||||
follow = ap.Follow(actor=MY_PERSON.id, object=actor)
|
||||
follow_id = tasks.post_to_outbox(follow)
|
||||
follow_id = post_to_outbox(follow)
|
||||
|
||||
return _user_api_response(activity=follow_id)
|
||||
|
||||
|
@ -1895,8 +1915,9 @@ def followers():
|
|||
)
|
||||
|
||||
raw_followers, older_than, newer_than = paginated_query(DB.activities, q)
|
||||
followers = [doc["meta"]["actor"]
|
||||
for doc in raw_followers if "actor" in doc.get("meta", {})]
|
||||
followers = [
|
||||
doc["meta"]["actor"] for doc in raw_followers if "actor" in doc.get("meta", {})
|
||||
]
|
||||
return render_template(
|
||||
"followers.html",
|
||||
followers_data=followers,
|
||||
|
@ -1924,9 +1945,11 @@ def following():
|
|||
abort(404)
|
||||
|
||||
following, older_than, newer_than = paginated_query(DB.activities, q)
|
||||
following = [(doc["remote_id"], doc["meta"]["object"])
|
||||
following = [
|
||||
(doc["remote_id"], doc["meta"]["object"])
|
||||
for doc in following
|
||||
if "remote_id" in doc and "object" in doc.get("meta", {})]
|
||||
if "remote_id" in doc and "object" in doc.get("meta", {})
|
||||
]
|
||||
return render_template(
|
||||
"following.html",
|
||||
following_data=following,
|
||||
|
@ -2087,7 +2110,7 @@ def indieauth_flow():
|
|||
return redirect(red)
|
||||
|
||||
|
||||
@app.route('/indieauth', methods=['GET', 'POST'])
|
||||
@app.route("/indieauth", methods=["GET", "POST"])
|
||||
def indieauth_endpoint():
|
||||
if request.method == "GET":
|
||||
if not session.get("logged_in"):
|
||||
|
@ -2189,9 +2212,7 @@ def token_endpoint():
|
|||
@app.route("/feed.json")
|
||||
def json_feed():
|
||||
return Response(
|
||||
response=json.dumps(
|
||||
activitypub.json_feed("/feed.json")
|
||||
),
|
||||
response=json.dumps(activitypub.json_feed("/feed.json")),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
|
@ -2210,3 +2231,538 @@ def rss_feed():
|
|||
response=activitypub.gen_feed().rss_str(),
|
||||
headers={"Content-Type": "application/rss+xml"},
|
||||
)
|
||||
|
||||
|
||||
@app.route("/task/t1")
|
||||
def task_t1():
|
||||
p.push(
|
||||
"https://mastodon.cloud/@iulius/101852467780804071/activity",
|
||||
"/task/cache_object",
|
||||
)
|
||||
return "ok"
|
||||
|
||||
|
||||
@app.route("/task/t2", methods=["POST"])
|
||||
def task_t2():
|
||||
print(request)
|
||||
print(request.headers)
|
||||
task = p.parse(request)
|
||||
print(task)
|
||||
return "yay"
|
||||
|
||||
|
||||
@app.route("/task/fetch_og_meta", methods=["POST"])
|
||||
def task_fetch_og_metadata():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
note = activity.get_object()
|
||||
links = opengraph.links_from_note(note.to_dict())
|
||||
og_metadata = opengraph.fetch_og_metadata(USER_AGENT, links)
|
||||
for og in og_metadata:
|
||||
if not og.get("image"):
|
||||
continue
|
||||
MEDIA_CACHE.cache_og_image(og["image"])
|
||||
|
||||
app.logger.debug(f"OG metadata {og_metadata!r}")
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri}, {"$set": {"meta.og_metadata": og_metadata}}
|
||||
)
|
||||
|
||||
app.logger.info(f"OG metadata fetched for {iri}")
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.exception(f"dropping activity {iri}, skip OG metedata")
|
||||
return ""
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
if 400 <= http_err.response.status_code < 500:
|
||||
app.logger.exception("bad request, no retry")
|
||||
return ""
|
||||
app.logger.exception("failed to fetch OG metadata")
|
||||
abort(500)
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to fetch OG metadata for {iri}")
|
||||
abort(500)
|
||||
|
||||
|
||||
@app.route("/task/cache_object", methods=["POST"])
|
||||
def task_cache_object():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
print(activity)
|
||||
print(activity.__dict__)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
obj = activity
|
||||
# obj = activity.get_object()
|
||||
DB.activities.update_one(
|
||||
{"remote_id": activity.id},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": obj.to_dict(embed=True),
|
||||
"meta.object_actor": activitypub._actor_to_meta(obj.get_actor()),
|
||||
}
|
||||
},
|
||||
)
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
app.logger.exception(f"flagging activity {iri} as deleted, no object caching")
|
||||
return ""
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to cache object for {iri}")
|
||||
abort(500)
|
||||
return ""
|
||||
|
||||
|
||||
class Tasks:
|
||||
@staticmethod
|
||||
def cache_object(iri: str) -> None:
|
||||
p.push(iri, "/task/cache_object")
|
||||
|
||||
@staticmethod
|
||||
def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
|
||||
p.push(
|
||||
{"iri": iri, "also_cache_attachments": also_cache_attachments},
|
||||
"/task/cache_actor",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def post_to_remote_inbox(payload: str, recp: str) -> None:
|
||||
p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
|
||||
|
||||
@staticmethod
|
||||
def forward_activity(iri: str) -> None:
|
||||
p.push(iri, "/task/forward_activity")
|
||||
|
||||
@staticmethod
|
||||
def fetch_og_meta(iri: str) -> None:
|
||||
p.push(iri, "/task/fetch_og_meta")
|
||||
|
||||
@staticmethod
|
||||
def process_new_activity(iri: str) -> None:
|
||||
p.push(iri, "/task/process_new_activity")
|
||||
|
||||
@staticmethod
|
||||
def cache_attachments(iri: str) -> None:
|
||||
p.push(iri, "/task/cache_attachments")
|
||||
|
||||
@staticmethod
|
||||
def finish_post_to_inbox(iri: str) -> None:
|
||||
p.push(iri, "/task/finish_post_to_inbox")
|
||||
|
||||
@staticmethod
|
||||
def finish_post_to_outbox(iri: str) -> None:
|
||||
p.push(iri, "/task/finish_post_to_outbox")
|
||||
|
||||
|
||||
@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
|
||||
def task_finish_post_to_outbox():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
recipients = activity.recipients()
|
||||
|
||||
if activity.has_type(ap.ActivityType.DELETE):
|
||||
back.outbox_delete(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.UPDATE):
|
||||
back.outbox_update(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
back.outbox_create(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.outbox_announce(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.LIKE):
|
||||
back.outbox_like(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
obj = activity.get_object()
|
||||
if obj.has_type(ap.ActivityType.LIKE):
|
||||
back.outbox_undo_like(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.outbox_undo_announce(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||
back.undo_new_following(MY_PERSON, obj)
|
||||
|
||||
app.logger.info(f"recipients={recipients}")
|
||||
activity = ap.clean_activity(activity.to_dict())
|
||||
|
||||
DB.cache2.remove()
|
||||
|
||||
payload = json.dumps(activity)
|
||||
for recp in recipients:
|
||||
app.logger.debug(f"posting to {recp}")
|
||||
Tasks.post_to_remote_inbox(payload, recp)
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.exception(f"no retry")
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to post to remote inbox for {iri}")
|
||||
abort(500)
|
||||
|
||||
|
||||
@app.route("/task/finish_post_to_inbox", methods=["POST"]) # noqa: C901
|
||||
def task_finish_post_to_inbox():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
if activity.has_type(ap.ActivityType.DELETE):
|
||||
back.inbox_delete(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.UPDATE):
|
||||
back.inbox_update(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
back.inbox_create(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.inbox_announce(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.LIKE):
|
||||
back.inbox_like(MY_PERSON, activity)
|
||||
elif activity.has_type(ap.ActivityType.FOLLOW):
|
||||
# Reply to a Follow with an Accept
|
||||
accept = ap.Accept(actor=ID, object=activity.to_dict(embed=True))
|
||||
post_to_outbox(accept)
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
obj = activity.get_object()
|
||||
if obj.has_type(ap.ActivityType.LIKE):
|
||||
back.inbox_undo_like(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.ANNOUNCE):
|
||||
back.inbox_undo_announce(MY_PERSON, obj)
|
||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||
back.undo_new_follower(MY_PERSON, obj)
|
||||
try:
|
||||
invalidate_cache(activity)
|
||||
except Exception:
|
||||
app.logger.exception("failed to invalidate cache")
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
app.logger.exception(f"no retry")
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to cache attachments for {iri}")
|
||||
abort(500)
|
||||
|
||||
|
||||
def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||
if activity.has_type(ap.CREATE_TYPES):
|
||||
activity = activity.build_create()
|
||||
|
||||
# Assign create a random ID
|
||||
obj_id = back.random_object_id()
|
||||
activity.set_id(back.activity_url(obj_id), obj_id)
|
||||
|
||||
back.save(Box.OUTBOX, activity)
|
||||
Tasks.cache_actor(activity.id)
|
||||
Tasks.finish_post_to_outbox(activity.id)
|
||||
return activity.id
|
||||
|
||||
|
||||
def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||
# Check for Block activity
|
||||
actor = activity.get_actor()
|
||||
if back.outbox_is_blocked(MY_PERSON, actor.id):
|
||||
app.logger.info(
|
||||
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
|
||||
)
|
||||
return
|
||||
|
||||
if back.inbox_check_duplicate(MY_PERSON, activity.id):
|
||||
# The activity is already in the inbox
|
||||
app.logger.info(f"received duplicate activity {activity!r}, dropping it")
|
||||
|
||||
back.save(Box.INBOX, activity)
|
||||
Tasks.process_new_activity(activity.id)
|
||||
|
||||
app.logger.info(f"spawning task for {activity!r}")
|
||||
Tasks.finish_post_to_inbox(activity.id)
|
||||
|
||||
|
||||
def invalidate_cache(activity):
|
||||
if activity.has_type(ap.ActivityType.LIKE):
|
||||
if activity.get_object().id.startswith(BASE_URL):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
if activity.get_object().id.startswith(BASE_URL):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.UNDO):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.DELETE):
|
||||
# TODO(tsileo): only invalidate if it's a delete of a reply
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.UPDATE):
|
||||
DB.cache2.remove()
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
note = activity.get_object()
|
||||
if not note.inReplyTo or note.inReplyTo.startswith(ID):
|
||||
DB.cache2.remove()
|
||||
# FIXME(tsileo): check if it's a reply of a reply
|
||||
|
||||
|
||||
@app.route("/task/cache_attachments", methods=["POST"])
|
||||
def task_cache_attachments():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
# Generates thumbnails for the actor's icon and the attachments if any
|
||||
|
||||
actor = activity.get_actor()
|
||||
|
||||
# Update the cached actor
|
||||
DB.actors.update_one(
|
||||
{"remote_id": iri},
|
||||
{"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}},
|
||||
upsert=True,
|
||||
)
|
||||
|
||||
if actor.icon:
|
||||
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
|
||||
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
for attachment in activity.get_object()._data.get("attachment", []):
|
||||
if (
|
||||
attachment.get("mediaType", "").startswith("image/")
|
||||
or attachment.get("type") == ap.ActivityType.IMAGE.value
|
||||
):
|
||||
try:
|
||||
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
|
||||
except ValueError:
|
||||
app.logger.exception(f"failed to cache {attachment}")
|
||||
|
||||
app.logger.info(f"attachments cached for {iri}")
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||
app.logger.exception(f"dropping activity {iri}, no attachment caching")
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to cache attachments for {iri}")
|
||||
abort(500)
|
||||
|
||||
|
||||
@app.route("/task/cache_actor", methods=["POST"])
|
||||
def task_cache_actor():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri, also_cache_attachments = (
|
||||
task.payload["iri"],
|
||||
task.payload.get("also_cache_attachments", True),
|
||||
)
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
if activity.has_type(ap.ActivityType.CREATE):
|
||||
Tasks.fetch_og_metadata(iri)
|
||||
|
||||
if activity.has_type([ap.ActivityType.LIKE, ap.ActivityType.ANNOUNCE]):
|
||||
Tasks.cache_object(iri)
|
||||
|
||||
actor = activity.get_actor()
|
||||
|
||||
cache_actor_with_inbox = False
|
||||
if activity.has_type(ap.ActivityType.FOLLOW):
|
||||
if actor.id != ID:
|
||||
# It's a Follow from the Inbox
|
||||
cache_actor_with_inbox = True
|
||||
else:
|
||||
# It's a new following, cache the "object" (which is the actor we follow)
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri},
|
||||
{
|
||||
"$set": {
|
||||
"meta.object": activitypub._actor_to_meta(
|
||||
activity.get_object()
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Cache the actor info
|
||||
DB.activities.update_one(
|
||||
{"remote_id": iri},
|
||||
{
|
||||
"$set": {
|
||||
"meta.actor": activitypub._actor_to_meta(
|
||||
actor, cache_actor_with_inbox
|
||||
)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
app.logger.info(f"actor cached for {iri}")
|
||||
if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE):
|
||||
Tasks.cache_attachments(iri)
|
||||
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
|
||||
app.logger.exception(f"flagging activity {iri} as deleted, no actor caching")
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to cache actor for {iri}")
|
||||
abort(500)
|
||||
|
||||
|
||||
@app.route("/task/process_new_activity", methods=["POST"]) # noqa:c901
|
||||
def task_process_new_activity():
|
||||
"""Process an activity received in the inbox"""
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
app.logger.info(f"activity={activity!r}")
|
||||
|
||||
# Is the activity expected?
|
||||
# following = ap.get_backend().following()
|
||||
should_forward = False
|
||||
should_delete = False
|
||||
|
||||
tag_stream = False
|
||||
if activity.has_type(ap.ActivityType.ANNOUNCE):
|
||||
try:
|
||||
activity.get_object()
|
||||
tag_stream = True
|
||||
except (NotAnActivityError, BadActivityError):
|
||||
app.logger.exception(f"failed to get announce object for {activity!r}")
|
||||
# Most likely on OStatus notice
|
||||
tag_stream = False
|
||||
should_delete = True
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
# The announced activity is deleted/gone, drop it
|
||||
should_delete = True
|
||||
|
||||
elif activity.has_type(ap.ActivityType.CREATE):
|
||||
note = activity.get_object()
|
||||
# Make the note part of the stream if it's not a reply, or if it's a local reply
|
||||
if not note.inReplyTo or note.inReplyTo.startswith(ID):
|
||||
tag_stream = True
|
||||
|
||||
if note.inReplyTo:
|
||||
try:
|
||||
reply = ap.fetch_remote_activity(note.inReplyTo)
|
||||
if (
|
||||
reply.id.startswith(ID) or reply.has_mention(ID)
|
||||
) and activity.is_public():
|
||||
# The reply is public "local reply", forward the reply (i.e. the original activity) to the
|
||||
# original recipients
|
||||
should_forward = True
|
||||
except NotAnActivityError:
|
||||
# Most likely a reply to an OStatus notce
|
||||
should_delete = True
|
||||
|
||||
# (partial) Ghost replies handling
|
||||
# [X] This is the first time the server has seen this Activity.
|
||||
should_forward = False
|
||||
local_followers = ID + "/followers"
|
||||
for field in ["to", "cc"]:
|
||||
if field in activity._data:
|
||||
if local_followers in activity._data[field]:
|
||||
# [X] The values of to, cc, and/or audience contain a Collection owned by the server.
|
||||
should_forward = True
|
||||
|
||||
# [X] The values of inReplyTo, object, target and/or tag are objects owned by the server
|
||||
if not (note.inReplyTo and note.inReplyTo.startswith(ID)):
|
||||
should_forward = False
|
||||
|
||||
elif activity.has_type(ap.ActivityType.DELETE):
|
||||
note = DB.activities.find_one(
|
||||
{"activity.object.id": activity.get_object().id}
|
||||
)
|
||||
if note and note["meta"].get("forwarded", False):
|
||||
# If the activity was originally forwarded, forward the delete too
|
||||
should_forward = True
|
||||
|
||||
elif activity.has_type(ap.ActivityType.LIKE):
|
||||
if not activity.get_object_id().startswith(BASE_URL):
|
||||
# We only want to keep a like if it's a like for a local activity
|
||||
# (Pleroma relay the likes it received, we don't want to store them)
|
||||
should_delete = True
|
||||
|
||||
if should_forward:
|
||||
app.logger.info(f"will forward {activity!r} to followers")
|
||||
Tasks.forward_activity(activity.id)
|
||||
|
||||
if should_delete:
|
||||
app.logger.info(f"will soft delete {activity!r}")
|
||||
|
||||
app.logger.info(f"{iri} tag_stream={tag_stream}")
|
||||
DB.activities.update_one(
|
||||
{"remote_id": activity.id},
|
||||
{
|
||||
"$set": {
|
||||
"meta.stream": tag_stream,
|
||||
"meta.forwarded": should_forward,
|
||||
"meta.deleted": should_delete,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
app.logger.info(f"new activity {iri} processed")
|
||||
if not should_delete and not activity.has_type(ap.ActivityType.DELETE):
|
||||
Tasks.cache_actor(iri)
|
||||
except (ActivityGoneError, ActivityNotFoundError):
|
||||
app.logger.log.exception(f"dropping activity {iri}, skip processing")
|
||||
except Exception:
|
||||
app.logger.exception(f"failed to process new activity {iri}")
|
||||
abort(500)
|
||||
|
||||
|
||||
@app.route("/task/forward_activity")
|
||||
def task_forward_activity():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
iri = task.payload
|
||||
try:
|
||||
activity = ap.fetch_remote_activity(iri)
|
||||
recipients = back.followers_as_recipients()
|
||||
app.logger.debug(f"Forwarding {activity!r} to {recipients}")
|
||||
activity = ap.clean_activity(activity.to_dict())
|
||||
payload = json.dumps(activity)
|
||||
for recp in recipients:
|
||||
app.logger.debug(f"forwarding {activity!r} to {recp}")
|
||||
Tasks.post_to_remote_inbox(payload, recp)
|
||||
except Exception:
|
||||
app.logger.exception("task failed")
|
||||
abort(500)
|
||||
|
||||
|
||||
@app.route("/task/post_to_remote_inbox")
|
||||
def task_post_to_remote_inbox():
|
||||
task = p.parse(request)
|
||||
app.logger.info(f"task={task!r}")
|
||||
payload, to = task.payload["payload"], task.payload["to"]
|
||||
try:
|
||||
app.logger.info("payload=%s", payload)
|
||||
app.logger.info("generating sig")
|
||||
signed_payload = json.loads(payload)
|
||||
|
||||
# Don't overwrite the signature if we're forwarding an activity
|
||||
if "signature" not in signed_payload:
|
||||
generate_signature(signed_payload, KEY)
|
||||
|
||||
app.logger.info("to=%s", to)
|
||||
resp = requests.post(
|
||||
to,
|
||||
data=json.dumps(signed_payload),
|
||||
auth=SIG_AUTH,
|
||||
headers={
|
||||
"Content-Type": HEADERS[1],
|
||||
"Accept": HEADERS[1],
|
||||
"User-Agent": USER_AGENT,
|
||||
},
|
||||
)
|
||||
app.logger.info("resp=%s", resp)
|
||||
app.logger.info("resp_body=%s", resp.text)
|
||||
resp.raise_for_status()
|
||||
except HTTPError as err:
|
||||
app.logger.exception("request failed")
|
||||
if 400 >= err.response.status_code >= 499:
|
||||
app.logger.info("client error, no retry")
|
||||
return ""
|
||||
|
||||
abort(500)
|
||||
|
|
17
config.py
17
config.py
|
@ -105,12 +105,17 @@ MEDIA_CACHE = MediaCache(GRIDFS, USER_AGENT)
|
|||
def create_indexes():
|
||||
DB.activities.create_index([("remote_id", pymongo.ASCENDING)])
|
||||
DB.activities.create_index([("activity.object.id", pymongo.ASCENDING)])
|
||||
DB.activities.create_index([
|
||||
("activity.object.id", pymongo.ASCENDING),
|
||||
("meta.deleted", pymongo.ASCENDING),
|
||||
])
|
||||
DB.cache2.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING), ("arg", pymongo.ASCENDING)])
|
||||
DB.cache2.create_index("date", expireAfterSeconds=3600*12)
|
||||
DB.activities.create_index(
|
||||
[("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING)]
|
||||
)
|
||||
DB.cache2.create_index(
|
||||
[
|
||||
("path", pymongo.ASCENDING),
|
||||
("type", pymongo.ASCENDING),
|
||||
("arg", pymongo.ASCENDING),
|
||||
]
|
||||
)
|
||||
DB.cache2.create_index("date", expireAfterSeconds=3600 * 12)
|
||||
|
||||
# Index for the block query
|
||||
DB.activities.create_index(
|
||||
|
|
|
@ -4,9 +4,6 @@ services:
|
|||
image: 'microblogpub:latest'
|
||||
ports:
|
||||
- "${WEB_PORT}:5005"
|
||||
links:
|
||||
- mongo
|
||||
- rmq
|
||||
volumes:
|
||||
- "${CONFIG_DIR}:/app/config"
|
||||
- "./static:/app/static"
|
||||
|
@ -14,12 +11,10 @@ services:
|
|||
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq//
|
||||
- MICROBLOGPUB_MONGODB_HOST=mongo:27017
|
||||
- MICROBLOGPUB_DEBUG=1
|
||||
- POUSSETACHES_AUTH_KEY=123
|
||||
celery:
|
||||
# image: "instance1_web"
|
||||
image: 'microblogpub:latest'
|
||||
links:
|
||||
- mongo
|
||||
- rmq
|
||||
command: 'celery worker -l info -A tasks'
|
||||
volumes:
|
||||
- "${CONFIG_DIR}:/app/config"
|
||||
|
@ -35,6 +30,10 @@ services:
|
|||
environment:
|
||||
- RABBITMQ_ERLANG_COOKIE=secretrabbit
|
||||
- RABBITMQ_NODENAME=rabbit@my-rabbit
|
||||
poussetaches:
|
||||
image: "poussetaches:latest"
|
||||
environment:
|
||||
- POUSSETACHES_AUTH_KEY=123
|
||||
networks:
|
||||
default:
|
||||
name: microblogpubfede
|
||||
|
|
|
@ -7,12 +7,14 @@ services:
|
|||
links:
|
||||
- mongo
|
||||
- rmq
|
||||
- poussetaches
|
||||
volumes:
|
||||
- "${CONFIG_DIR}:/app/config"
|
||||
- "./static:/app/static"
|
||||
environment:
|
||||
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq//
|
||||
- MICROBLOGPUB_MONGODB_HOST=mongo:27017
|
||||
- POUSSETACHES_AUTH_KEY=123
|
||||
celery:
|
||||
image: 'microblogpub:latest'
|
||||
links:
|
||||
|
@ -36,3 +38,7 @@ services:
|
|||
- RABBITMQ_NODENAME=rabbit@my-rabbit
|
||||
volumes:
|
||||
- "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq"
|
||||
poussetaches:
|
||||
image: "poussetaches:latest"
|
||||
environment:
|
||||
- POUSSETACHES_AUTH_KEY=123
|
||||
|
|
48
poussetaches.py
Normal file
48
poussetaches.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
import base64
|
||||
import json
|
||||
import os
|
||||
from typing import Any
|
||||
from dataclasses import dataclass
|
||||
import flask
|
||||
import requests
|
||||
|
||||
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
req_id: str
|
||||
tries: int
|
||||
|
||||
payload: Any
|
||||
|
||||
|
||||
class PousseTaches:
|
||||
def __init__(self, api_url: str, base_url: str) -> None:
|
||||
self.api_url = api_url
|
||||
self.base_url = base_url
|
||||
|
||||
def push(self, payload: Any, path: str, expected=200) -> str:
|
||||
# Encode our payload
|
||||
p = base64.b64encode(json.dumps(payload).encode()).decode()
|
||||
|
||||
# Queue/push it
|
||||
resp = requests.post(
|
||||
self.api_url,
|
||||
json={"url": self.base_url + path, "payload": p, "expected": expected},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
return resp.headers.get("Poussetaches-Task-ID")
|
||||
|
||||
def parse(self, req: flask.Request) -> Task:
|
||||
if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
|
||||
raise ValueError("Bad auth key")
|
||||
|
||||
# Parse the "envelope"
|
||||
envelope = json.loads(req.data)
|
||||
print(req)
|
||||
print(f"envelope={envelope!r}")
|
||||
payload = json.loads(base64.b64decode(envelope["payload"]))
|
||||
|
||||
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload)
|
1
tasks.py
1
tasks.py
|
@ -339,6 +339,7 @@ def invalidate_cache(activity):
|
|||
DB.cache2.remove()
|
||||
# FIXME(tsileo): check if it's a reply of a reply
|
||||
|
||||
|
||||
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901
|
||||
def finish_post_to_inbox(self, iri: str) -> None:
|
||||
try:
|
||||
|
|
Loading…
Reference in a new issue