microblog.pub/core/activitypub.py
2019-08-15 00:00:52 +02:00

697 lines
23 KiB
Python

import binascii
import hashlib
import logging
import os
from contextlib import contextmanager
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from urllib.parse import urljoin
from urllib.parse import urlparse
from bson.objectid import ObjectId
from cachetools import LRUCache
from flask import url_for
from little_boxes import activitypub as ap
from little_boxes import strtobool
from little_boxes.activitypub import _to_list
from little_boxes.activitypub import clean_activity
from little_boxes.activitypub import format_datetime
from little_boxes.backend import Backend
from little_boxes.errors import ActivityGoneError
from little_boxes.httpsig import HTTPSigAuth
from config import BASE_URL
from config import DB
from config import EXTRA_INBOXES
from config import ID
from config import KEY
from config import ME
from config import USER_AGENT
from core.db import update_many_activities
from core.meta import Box
from core.meta import MetaKey
from core.meta import flag
from core.meta import upsert
from core.tasks import Tasks
logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
SIG_AUTH = HTTPSigAuth(KEY)
_ACTIVITY_CACHE_ENABLED = True
ACTORS_CACHE = LRUCache(maxsize=256)
MY_PERSON = ap.Person(**ME)
@contextmanager
def no_cache() -> Iterator[None]:
"""Context manager for disabling the "DB cache" when fetching AP activities."""
global _ACTIVITY_CACHE_ENABLED
_ACTIVITY_CACHE_ENABLED = False
try:
yield
finally:
_ACTIVITY_CACHE_ENABLED = True
def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
"""Helper for removing MongoDB's `_id` field."""
doc = doc.copy()
if "_id" in doc:
del doc["_id"]
return doc
def _answer_key(choice: str) -> str:
h = hashlib.new("sha1")
h.update(choice.encode())
return h.hexdigest()
def _actor_hash(actor: ap.ActivityType) -> str:
"""Used to know when to update the meta actor cache, like an "actor version"."""
h = hashlib.new("sha1")
h.update(actor.id.encode())
h.update((actor.name or "").encode())
h.update((actor.preferredUsername or "").encode())
h.update((actor.summary or "").encode())
h.update((actor.url or "").encode())
key = actor.get_key()
h.update(key.pubkey_pem.encode())
h.update(key.key_id().encode())
if isinstance(actor.icon, dict) and "url" in actor.icon:
h.update(actor.icon["url"].encode())
return h.hexdigest()
def _is_local_reply(create: ap.Create) -> bool:
for dest in _to_list(create.to or []):
if dest.startswith(BASE_URL):
return True
for dest in _to_list(create.cc or []):
if dest.startswith(BASE_URL):
return True
return False
def save(box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
object_id = None
try:
object_id = activity.get_object_id()
except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object
pass
object_visibility = None
if activity.has_type(
[ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE]
):
object_visibility = ap.get_visibility(activity.get_object()).name
actor_id = activity.get_actor().id
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
"undo": False,
"deleted": False,
"public": is_public,
"server": urlparse(activity.id).netloc,
"visibility": visibility.name,
"actor_id": actor_id,
"object_id": object_id,
"object_visibility": object_visibility,
"poll_answer": False,
},
}
)
def outbox_is_blocked(actor_id: str) -> bool:
return bool(
DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor_id,
"meta.undo": False,
}
)
)
def activity_url(item_id: str) -> str:
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if outbox_is_blocked(actor.id):
logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}):
# The activity is already in the inbox
logger.info(f"received duplicate activity {activity!r}, dropping it")
return
save(Box.INBOX, activity)
logger.info(f"spawning tasks for {activity!r}")
if not activity.has_type([ap.ActivityType.DELETE, ap.ActivityType.UPDATE]):
Tasks.cache_actor(activity.id)
Tasks.process_new_activity(activity.id)
Tasks.finish_post_to_inbox(activity.id)
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
obj_id = binascii.hexlify(os.urandom(8)).decode("utf-8")
uri = activity_url(obj_id)
activity._data["id"] = uri
if activity.has_type(ap.ActivityType.CREATE):
activity._data["object"]["id"] = urljoin(
BASE_URL, url_for("outbox_activity", item_id=obj_id)
)
activity._data["object"]["url"] = urljoin(
BASE_URL, url_for("note_by_id", note_id=obj_id)
)
activity.reset_object_cache()
save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
class MicroblogPubBackend(Backend):
"""Implements a Little Boxes backend, backed by MongoDB."""
def base_url(self) -> str:
return BASE_URL
def debug_mode(self) -> bool:
return strtobool(os.getenv("MICROBLOGPUB_DEBUG", "false"))
def user_agent(self) -> str:
"""Setup a custom user agent."""
return USER_AGENT
def extra_inboxes(self) -> List[str]:
return EXTRA_INBOXES
def followers(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
def followers_as_recipients(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
recipients = []
for doc in DB.activities.find(q):
recipients.append(
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
)
return list(set(recipients))
def following(self) -> List[str]:
q = {
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
def parse_collection(
self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None
) -> List[str]:
"""Resolve/fetch a `Collection`/`OrderedCollection`."""
# Resolve internal collections via MongoDB directly
if url == ID + "/followers":
return self.followers()
elif url == ID + "/following":
return self.following()
return super().parse_collection(payload, url)
def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901
# Shortcut if the instance actor is fetched
if iri == ME["id"]:
return ME
# Internal collecitons handling
# Followers
if iri == MY_PERSON.followers:
followers = []
for data in DB.activities.find(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
):
followers.append(data["meta"]["actor_id"])
return {"type": "Collection", "items": followers}
# Following
if iri == MY_PERSON.following:
following = []
for data in DB.activities.find(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
):
following.append(data["meta"]["object_id"])
return {"type": "Collection", "items": following}
# TODO(tsileo): handle the liked collection too
# Check if the activity is owned by this server
if iri.startswith(BASE_URL):
is_a_note = False
if iri.endswith("/activity"):
iri = iri.replace("/activity", "")
is_a_note = True
data = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri})
if data and data["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
if data and is_a_note:
return data["activity"]["object"]
elif data:
return data["activity"]
else:
# Check if the activity is stored in the inbox
data = DB.activities.find_one({"remote_id": iri})
if data:
if data["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
return data["activity"]
# Check if we're looking for an object wrapped in a Create
obj = DB.activities.find_one({"meta.object_id": iri, "type": "Create"})
if obj:
if obj["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
return obj["meta"].get("object") or obj["activity"]["object"]
# TODO(tsileo): also check the REPLIES box
# Check if it's cached because it's a follower
# Remove extra info (like the key hash if any)
cleaned_iri = iri.split("#")[0]
actor = DB.activities.find_one(
{"meta.actor_id": cleaned_iri, "meta.actor": {"$exists": True}}
)
# "type" check is here to skip old metadata for "old/buggy" followers
if (
actor
and actor["meta"].get("actor")
and "type" in actor["meta"]["actor"]
):
return actor["meta"]["actor"]
# Check if it's cached because it's a following
actor2 = DB.activities.find_one(
{
"meta.object_id": cleaned_iri,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
)
if (
actor2
and actor2["meta"].get("object")
and "type" in actor2["meta"]["object"]
):
return actor2["meta"]["object"]
# Fetch the URL via HTTP
logger.info(f"dereference {iri} via HTTP")
return super().fetch_iri(iri)
def fetch_iri(self, iri: str, no_cache=False) -> ap.ObjectType:
if not no_cache and _ACTIVITY_CACHE_ENABLED:
# Fetch the activity by checking the local DB first
data = self._fetch_iri(iri)
logger.debug(f"_fetch_iri({iri!r}) == {data!r}")
else:
# Pass the SIG_AUTH to enable "authenticated fetch"
data = super().fetch_iri(iri, auth=SIG_AUTH)
logger.debug(f"fetch_iri({iri!r}) == {data!r}")
return data
def set_post_to_remote_inbox(self, cb):
self.post_to_remote_inbox_cb = cb
def _handle_replies_delete(
self, as_actor: ap.Person, in_reply_to: Optional[str]
) -> None:
if not in_reply_to:
pass
DB.activities.update_one(
{"activity.object.id": in_reply_to},
{"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}},
)
def _process_question_reply(self, create: ap.Create, question: ap.Question) -> None:
choice = create.get_object().name
# Ensure it's a valid choice
if choice not in [
c["name"] for c in question._data.get("oneOf", question.anyOf)
]:
logger.info("invalid choice")
return
# Check for duplicate votes
if DB.activities.find_one(
{
"activity.object.actor": create.get_actor().id,
"meta.answer_to": question.id,
}
):
logger.info("duplicate response")
return
# Update the DB
answer_key = _answer_key(choice)
DB.activities.update_one(
{"activity.object.id": question.id},
{
"$inc": {
"meta.question_replies": 1,
f"meta.question_answers.{answer_key}": 1,
}
},
)
DB.activities.update_one(
{"remote_id": create.id},
{
"$set": {
"meta.answer_to": question.id,
"meta.stream": False,
"meta.poll_answer": True,
}
},
)
return None
def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None:
"""Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent"
key to make it easy to query a whole thread."""
in_reply_to = create.get_object().get_in_reply_to()
if not in_reply_to:
return
new_threads = []
root_reply = in_reply_to
reply = ap.fetch_remote_activity(root_reply)
# FIXME(tsileo): can be a Create here (instead of a Note, Hubzilla's doing that)
# FIXME(tsileo): can be a 403 too, in this case what to do? not error at least
# Ensure the this is a local reply, of a question, with a direct "to" addressing
if (
reply.id.startswith(BASE_URL)
and reply.has_type(ap.ActivityType.QUESTION.value)
and _is_local_reply(create)
and not create.is_public()
):
return self._process_question_reply(create, reply)
elif (
create.id.startswith(BASE_URL)
and reply.has_type(ap.ActivityType.QUESTION.value)
and not create.is_public()
):
# Keep track of our own votes
DB.activities.update_one(
{"activity.object.id": reply.id, "box": "inbox"},
{"$set": {"meta.voted_for": create.get_object().name}},
)
return None
creply = DB.activities.find_one_and_update(
{"activity.object.id": in_reply_to},
{"$inc": {"meta.count_reply": 1, "meta.count_direct_reply": 1}},
)
if not creply:
# It means the activity is not in the inbox, and not in the outbox, we want to save it
save(Box.REPLIES, reply)
new_threads.append(reply.id)
# TODO(tsileo): parses the replies collection and import the replies?
while reply is not None:
in_reply_to = reply.get_in_reply_to()
if not in_reply_to:
break
root_reply = in_reply_to
reply = ap.fetch_remote_activity(root_reply)
# FIXME(tsileo): can be a Create here (instead of a Note, Hubzilla's doing that)
q = {"activity.object.id": root_reply}
if not DB.activities.count(q):
save(Box.REPLIES, reply)
new_threads.append(reply.id)
DB.activities.update_one(
{"remote_id": create.id}, {"$set": {"meta.thread_root_parent": root_reply}}
)
DB.activities.update(
{"box": Box.REPLIES.value, "remote_id": {"$in": new_threads}},
{"$set": {"meta.thread_root_parent": root_reply}},
)
def embed_collection(total_items, first_page_id):
"""Helper creating a root OrderedCollection with a link to the first page."""
return {
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"totalItems": total_items,
"first": f"{first_page_id}?page=first",
"id": first_page_id,
}
def simple_build_ordered_collection(col_name, data):
return {
"@context": ap.COLLECTION_CTX,
"id": BASE_URL + "/" + col_name,
"totalItems": len(data),
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"orderedItems": data,
}
def build_ordered_collection(
col, q=None, cursor=None, map_func=None, limit=50, col_name=None, first_page=False
):
"""Helper for building an OrderedCollection from a MongoDB query (with pagination support)."""
col_name = col_name or col.name
if q is None:
q = {}
if cursor:
q["_id"] = {"$lt": ObjectId(cursor)}
data = list(col.find(q, limit=limit).sort("_id", -1))
if not data:
# Returns an empty page if there's a cursor
if cursor:
return {
"@context": ap.COLLECTION_CTX,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
"id": BASE_URL + "/" + col_name + "?cursor=" + cursor,
"partOf": BASE_URL + "/" + col_name,
"totalItems": 0,
"orderedItems": [],
}
return {
"@context": ap.COLLECTION_CTX,
"id": BASE_URL + "/" + col_name,
"totalItems": 0,
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"orderedItems": [],
}
start_cursor = str(data[0]["_id"])
next_page_cursor = str(data[-1]["_id"])
total_items = col.find(q).count()
data = [_remove_id(doc) for doc in data]
if map_func:
data = [map_func(doc) for doc in data]
# No cursor, this is the first page and we return an OrderedCollection
if not cursor:
resp = {
"@context": ap.COLLECTION_CTX,
"id": f"{BASE_URL}/{col_name}",
"totalItems": total_items,
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"first": {
"id": f"{BASE_URL}/{col_name}?cursor={start_cursor}",
"orderedItems": data,
"partOf": f"{BASE_URL}/{col_name}",
"totalItems": total_items,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
},
}
if len(data) == limit:
resp["first"]["next"] = (
BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor
)
if first_page:
return resp["first"]
return resp
# If there's a cursor, then we return an OrderedCollectionPage
resp = {
"@context": ap.COLLECTION_CTX,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
"id": BASE_URL + "/" + col_name + "?cursor=" + start_cursor,
"totalItems": total_items,
"partOf": BASE_URL + "/" + col_name,
"orderedItems": data,
}
if len(data) == limit:
resp["next"] = BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor
if first_page:
return resp["first"]
# XXX(tsileo): implements prev with prev=<first item cursor>?
return resp
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
activity = raw_doc["activity"]
if (
ap._has_type(activity["type"], ap.ActivityType.CREATE)
and "object" in activity
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
):
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
choice["replies"] = {
"type": ap.ActivityType.COLLECTION.value,
"totalItems": raw_doc["meta"]
.get("question_answers", {})
.get(_answer_key(choice["name"]), 0),
}
now = datetime.now(timezone.utc)
if format_datetime(now) >= activity["object"]["endTime"]:
activity["object"]["closed"] = activity["object"]["endTime"]
def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]:
if raw_doc["activity"]["type"] != ap.ActivityType.CREATE.value:
return raw_doc
raw_doc["activity"]["object"]["replies"] = embed_collection(
raw_doc.get("meta", {}).get("count_direct_reply", 0),
f'{raw_doc["remote_id"]}/replies',
)
raw_doc["activity"]["object"]["likes"] = embed_collection(
raw_doc.get("meta", {}).get("count_like", 0), f'{raw_doc["remote_id"]}/likes'
)
raw_doc["activity"]["object"]["shares"] = embed_collection(
raw_doc.get("meta", {}).get("count_boost", 0), f'{raw_doc["remote_id"]}/shares'
)
return raw_doc
def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]:
if "@context" in activity:
del activity["@context"]
return activity
def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]:
raw_doc = add_extra_collection(raw_doc)
activity = clean_activity(raw_doc["activity"])
# Handle Questions
# TODO(tsileo): what about object embedded by ID/URL?
_add_answers_to_question(raw_doc)
if embed:
return remove_context(activity)
return activity
def _cache_actor_icon(actor: ap.BaseActivity) -> None:
if actor.icon:
if isinstance(actor.icon, dict) and "url" in actor.icon:
Tasks.cache_actor_icon(actor.icon["url"], actor.id)
else:
logger.warning(f"failed to parse icon {actor.icon} for {actor!r}")
def update_cached_actor(actor: ap.BaseActivity) -> None:
actor_hash = _actor_hash(actor)
update_many_activities(
{
**flag(MetaKey.ACTOR_ID, actor.id),
**flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{MetaKey.ACTOR: actor.to_dict(embed=True), MetaKey.ACTOR_HASH: actor_hash}
),
)
update_many_activities(
{
**flag(MetaKey.OBJECT_ACTOR_ID, actor.id),
**flag(MetaKey.OBJECT_ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{
MetaKey.OBJECT_ACTOR: actor.to_dict(embed=True),
MetaKey.OBJECT_ACTOR_HASH: actor_hash,
}
),
)
# TODO(tsileo): Also update following (it's in the object)
# DB.activities.update_many(
# {"meta.object_id": actor.id}, {"$set": {"meta.object": actor.to_dict(embed=True)}}
# )
_cache_actor_icon(actor)