microblog.pub/core/activitypub.py

813 lines
27 KiB
Python
Raw Normal View History

import binascii
2019-04-14 17:17:54 +00:00
import hashlib
2018-05-27 12:21:06 +00:00
import logging
2018-06-24 17:51:09 +00:00
import os
2018-05-18 18:41:41 +00:00
from datetime import datetime
from datetime import timezone
2018-06-16 20:02:10 +00:00
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urljoin
from urllib.parse import urlparse
2018-05-18 18:41:41 +00:00
from bson.objectid import ObjectId
from flask import url_for
2018-07-11 21:22:47 +00:00
from little_boxes import activitypub as ap
from little_boxes import strtobool
from little_boxes.activitypub import _to_list
from little_boxes.activitypub import clean_activity
from little_boxes.activitypub import format_datetime
2018-07-11 21:22:47 +00:00
from little_boxes.backend import Backend
from little_boxes.errors import ActivityGoneError
2019-08-11 12:25:43 +00:00
from little_boxes.httpsig import HTTPSigAuth
2018-05-18 18:41:41 +00:00
2018-06-16 20:02:10 +00:00
from config import BASE_URL
from config import DB
from config import DEFAULT_CTX
2018-06-16 20:02:10 +00:00
from config import ID
2019-08-11 12:25:43 +00:00
from config import KEY
2018-06-16 20:02:10 +00:00
from config import ME
from config import USER_AGENT
from core.db import find_one_activity
2019-08-11 09:41:16 +00:00
from core.db import update_many_activities
2019-08-25 09:09:34 +00:00
from core.db import update_one_activity
2019-08-01 20:00:26 +00:00
from core.meta import Box
2019-09-01 08:26:25 +00:00
from core.meta import FollowStatus
2019-08-11 09:41:16 +00:00
from core.meta import MetaKey
from core.meta import by_object_id
2019-08-16 12:42:15 +00:00
from core.meta import by_remote_id
2019-08-17 16:38:15 +00:00
from core.meta import by_type
2019-08-11 09:41:16 +00:00
from core.meta import flag
2019-08-16 12:42:15 +00:00
from core.meta import inc
2019-08-11 09:41:16 +00:00
from core.meta import upsert
from core.remote import server
2019-08-01 20:25:58 +00:00
from core.tasks import Tasks
2019-08-16 12:42:15 +00:00
from utils import now
2018-06-16 20:33:51 +00:00
2018-05-27 12:21:06 +00:00
logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
2019-08-11 12:25:43 +00:00
SIG_AUTH = HTTPSigAuth(KEY)
2018-05-18 18:41:41 +00:00
MY_PERSON = ap.Person(**ME)
2018-07-11 18:04:48 +00:00
2019-09-01 08:26:25 +00:00
_LOCAL_NETLOC = urlparse(BASE_URL).netloc
def is_from_outbox(activity: ap.BaseActivity) -> bool:
return activity.id.startswith(BASE_URL)
def is_local_url(url: str) -> bool:
return urlparse(url).netloc == _LOCAL_NETLOC
2018-07-11 18:04:48 +00:00
2018-06-16 19:24:53 +00:00
def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
2018-05-27 20:30:43 +00:00
"""Helper for removing MongoDB's `_id` field."""
2018-05-18 18:41:41 +00:00
doc = doc.copy()
2018-06-16 19:24:53 +00:00
if "_id" in doc:
2019-04-13 08:00:56 +00:00
del doc["_id"]
2018-05-18 18:41:41 +00:00
return doc
2019-04-14 17:17:54 +00:00
def _answer_key(choice: str) -> str:
h = hashlib.new("sha1")
h.update(choice.encode())
return h.hexdigest()
def _actor_hash(actor: ap.ActivityType) -> str:
"""Used to know when to update the meta actor cache, like an "actor version"."""
h = hashlib.new("sha1")
h.update(actor.id.encode())
h.update((actor.name or "").encode())
h.update((actor.preferredUsername or "").encode())
h.update((actor.summary or "").encode())
h.update((actor.url or "").encode())
key = actor.get_key()
h.update(key.pubkey_pem.encode())
h.update(key.key_id().encode())
if isinstance(actor.icon, dict) and "url" in actor.icon:
h.update(actor.icon["url"].encode())
return h.hexdigest()
2019-07-17 14:14:29 +00:00
def _is_local_reply(create: ap.Create) -> bool:
for dest in _to_list(create.to or []):
if dest.startswith(BASE_URL):
return True
for dest in _to_list(create.cc or []):
if dest.startswith(BASE_URL):
return True
return False
def save(box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
object_id = None
try:
object_id = activity.get_object_id()
except Exception: # TODO(tsileo): should be ValueError, but replies trigger a KeyError on object
pass
object_visibility = None
if activity.has_type(
[ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE, ap.ActivityType.LIKE]
):
object_visibility = ap.get_visibility(activity.get_object()).name
actor_id = activity.get_actor().id
2019-09-01 08:26:25 +00:00
# Set some "type"-related neta
2019-09-01 18:58:51 +00:00
extra: Dict[str, Any] = {}
if box == Box.OUTBOX and activity.has_type(ap.ActivityType.FOLLOW):
2019-09-01 08:26:25 +00:00
extra[MetaKey.FOLLOW_STATUS.value] = FollowStatus.WAITING.value
2019-09-01 18:58:51 +00:00
elif activity.has_type(ap.ActivityType.CREATE):
mentions = []
obj = activity.get_object()
for m in obj.get_mentions():
mentions.append(m.href)
hashtags = []
for h in obj.get_hashtags():
hashtags.append(h.name[1:]) # Strip the #
extra.update(
{MetaKey.MENTIONS.value: mentions, MetaKey.HASHTAGS.value: hashtags}
)
2019-09-01 08:26:25 +00:00
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
MetaKey.UNDO.value: False,
MetaKey.DELETED.value: False,
MetaKey.PUBLIC.value: is_public,
MetaKey.SERVER.value: urlparse(activity.id).netloc,
MetaKey.VISIBILITY.value: visibility.name,
MetaKey.ACTOR_ID.value: actor_id,
MetaKey.OBJECT_ID.value: object_id,
MetaKey.OBJECT_VISIBILITY.value: object_visibility,
MetaKey.POLL_ANSWER.value: False,
MetaKey.PUBLISHED.value: activity.published
if activity.published
else now(),
2019-09-01 08:26:25 +00:00
**extra,
},
}
)
def outbox_is_blocked(actor_id: str) -> bool:
return bool(
DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor_id,
"meta.undo": False,
}
)
)
def activity_url(item_id: str) -> str:
return urljoin(BASE_URL, url_for("outbox_detail", item_id=item_id))
def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity
actor = activity.get_actor()
if outbox_is_blocked(actor.id):
logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
)
return
# If the message is coming from a Pleroma relay, we process it as a possible reply for a stream activity
if (
actor.has_type(ap.ActivityType.APPLICATION)
and actor.id.endswith("/relay")
and activity.has_type(ap.ActivityType.ANNOUNCE)
2019-08-18 07:40:43 +00:00
and not find_one_activity(
{
**by_object_id(activity.get_object_id()),
**by_type(ap.ActivityType.CREATE),
}
)
and not DB.replies.find_one(by_remote_id(activity.get_object_id()))
):
Tasks.process_reply(activity.get_object_id())
return
2019-08-20 21:31:37 +00:00
# Hubzilla sends Update with the same ID as the actor, and it poisons the cache
if (
activity.has_type(ap.ActivityType.UPDATE)
and activity.id == activity.get_object_id()
):
# Start a task to update the cached actor
Tasks.cache_actor(activity.id)
return
2019-09-06 17:51:14 +00:00
# Honk forwards activities in a Read, process them as replies
if activity.has_type(ap.ActivityType.READ):
Tasks.process_reply(activity.get_object_id())
return
# TODO(tsileo): support ignore from Honk
2019-08-20 21:31:37 +00:00
# Hubzilla forwards activities in a Create, process them as possible replies
if activity.has_type(ap.ActivityType.CREATE) and server(activity.id) != server(
activity.get_object_id()
):
Tasks.process_reply(activity.get_object_id())
return
if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": activity.id}):
# The activity is already in the inbox
logger.info(f"received duplicate activity {activity!r}, dropping it")
return
save(Box.INBOX, activity)
2019-08-11 12:16:57 +00:00
logger.info(f"spawning tasks for {activity!r}")
2019-08-14 22:00:52 +00:00
if not activity.has_type([ap.ActivityType.DELETE, ap.ActivityType.UPDATE]):
2019-08-11 12:16:57 +00:00
Tasks.cache_actor(activity.id)
Tasks.process_new_activity(activity.id)
Tasks.finish_post_to_inbox(activity.id)
def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None:
visibility = ap.get_visibility(activity)
is_public = False
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True
2019-08-16 12:42:15 +00:00
published = activity.published if activity.published else now()
DB.replies.insert_one(
{
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {
"undo": False,
"deleted": False,
"public": is_public,
"server": urlparse(activity.id).netloc,
"visibility": visibility.name,
"actor_id": activity.get_actor().id,
2019-08-16 12:42:15 +00:00
MetaKey.PUBLISHED.value: published,
**meta,
},
}
)
2019-09-08 08:34:45 +00:00
def new_context(parent: Optional[ap.BaseActivity] = None) -> str:
"""`context` is here to group related activities, it's not meant to be resolved.
We're just following the convention."""
# Copy the context from the parent if any
if parent and (parent.context or parent.conversation):
if parent.context:
if isinstance(parent.context, str):
return parent.context
elif isinstance(parent.context, dict) and parent.context.get("id"):
return parent.context["id"]
return parent.conversation
# Generate a new context
ctx_id = binascii.hexlify(os.urandom(12)).decode("utf-8")
return urljoin(BASE_URL, f"/contexts/{ctx_id}")
def post_to_outbox(activity: ap.BaseActivity) -> str:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
# Assign create a random ID
2019-09-08 08:34:45 +00:00
obj_id = binascii.hexlify(os.urandom(12)).decode("utf-8")
uri = activity_url(obj_id)
activity._data["id"] = uri
if activity.has_type(ap.ActivityType.CREATE):
activity._data["object"]["id"] = urljoin(
BASE_URL, url_for("outbox_activity", item_id=obj_id)
)
activity._data["object"]["url"] = urljoin(
BASE_URL, url_for("note_by_id", note_id=obj_id)
)
activity.reset_object_cache()
save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id)
return activity.id
2018-06-16 19:24:53 +00:00
class MicroblogPubBackend(Backend):
2018-06-21 22:55:50 +00:00
"""Implements a Little Boxes backend, backed by MongoDB."""
def ap_context(self) -> Any:
return DEFAULT_CTX
def base_url(self) -> str:
return BASE_URL
2018-06-24 17:51:09 +00:00
def debug_mode(self) -> bool:
return strtobool(os.getenv("MICROBLOGPUB_DEBUG", "false"))
2018-06-16 20:02:10 +00:00
def user_agent(self) -> str:
2018-06-21 22:55:50 +00:00
"""Setup a custom user agent."""
2018-06-16 20:02:10 +00:00
return USER_AGENT
def followers(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
def followers_as_recipients(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
recipients = []
for doc in DB.activities.find(q):
recipients.append(
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
)
return list(set(recipients))
def following(self) -> List[str]:
q = {
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
2018-07-08 10:24:49 +00:00
def parse_collection(
2018-07-08 10:43:34 +00:00
self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None
2018-07-08 10:24:49 +00:00
) -> List[str]:
"""Resolve/fetch a `Collection`/`OrderedCollection`."""
# Resolve internal collections via MongoDB directly
if url == ID + "/followers":
return self.followers()
2018-07-08 10:24:49 +00:00
elif url == ID + "/following":
return self.following()
2018-07-08 10:24:49 +00:00
return super().parse_collection(payload, url)
def _fetch_iri(self, iri: str) -> ap.ObjectType: # noqa: C901
# Shortcut if the instance actor is fetched
2018-06-17 18:51:23 +00:00
if iri == ME["id"]:
return ME
# Internal collecitons handling
# Followers
if iri == MY_PERSON.followers:
followers = []
for data in DB.activities.find(
{
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
):
followers.append(data["meta"]["actor_id"])
return {"type": "Collection", "items": followers}
# Following
if iri == MY_PERSON.following:
following = []
for data in DB.activities.find(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
):
following.append(data["meta"]["object_id"])
return {"type": "Collection", "items": following}
# TODO(tsileo): handle the liked collection too
2018-06-17 18:51:23 +00:00
# Check if the activity is owned by this server
if iri.startswith(BASE_URL):
2018-06-17 20:05:38 +00:00
is_a_note = False
2018-06-18 20:01:21 +00:00
if iri.endswith("/activity"):
iri = iri.replace("/activity", "")
2018-06-17 20:05:38 +00:00
is_a_note = True
2018-06-29 20:16:26 +00:00
data = DB.activities.find_one({"box": Box.OUTBOX.value, "remote_id": iri})
if data and data["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
2018-06-29 20:16:26 +00:00
if data and is_a_note:
return data["activity"]["object"]
elif data:
2018-06-17 19:54:16 +00:00
return data["activity"]
else:
# Check if the activity is stored in the inbox
2018-06-29 20:16:26 +00:00
data = DB.activities.find_one({"remote_id": iri})
2018-06-17 19:54:16 +00:00
if data:
if data["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
2018-06-17 19:54:16 +00:00
return data["activity"]
2019-08-10 09:47:50 +00:00
# Check if we're looking for an object wrapped in a Create
obj = DB.activities.find_one({"meta.object_id": iri, "type": "Create"})
if obj:
if obj["meta"]["deleted"]:
raise ActivityGoneError(f"{iri} is gone")
return obj["meta"].get("object") or obj["activity"]["object"]
2019-08-10 09:47:50 +00:00
# TODO(tsileo): also check the REPLIES box
# Check if it's cached because it's a follower
# Remove extra info (like the key hash if any)
cleaned_iri = iri.split("#")[0]
actor = DB.activities.find_one(
2019-08-10 09:47:50 +00:00
{"meta.actor_id": cleaned_iri, "meta.actor": {"$exists": True}}
)
2019-07-17 07:00:46 +00:00
# "type" check is here to skip old metadata for "old/buggy" followers
if (
actor
and actor["meta"].get("actor")
and "type" in actor["meta"]["actor"]
):
return actor["meta"]["actor"]
# Check if it's cached because it's a following
actor2 = DB.activities.find_one(
{
"meta.object_id": cleaned_iri,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
)
2019-07-17 07:00:46 +00:00
if (
actor2
and actor2["meta"].get("object")
and "type" in actor2["meta"]["object"]
):
return actor2["meta"]["object"]
2018-06-17 18:51:23 +00:00
2019-08-16 12:44:20 +00:00
reply = DB.replies.find_one(by_remote_id(iri))
if reply:
return reply["activity"]
2018-06-17 18:51:23 +00:00
# Fetch the URL via HTTP
2018-07-14 11:45:06 +00:00
logger.info(f"dereference {iri} via HTTP")
2018-06-16 20:33:51 +00:00
return super().fetch_iri(iri)
def fetch_iri(self, iri: str, **kwargs: Any) -> ap.ObjectType:
if not kwargs.pop("no_cache", False):
# Fetch the activity by checking the local DB first
2019-07-04 21:22:38 +00:00
data = self._fetch_iri(iri)
logger.debug(f"_fetch_iri({iri!r}) == {data!r}")
2019-07-04 21:22:38 +00:00
else:
2019-08-11 12:25:43 +00:00
# Pass the SIG_AUTH to enable "authenticated fetch"
data = super().fetch_iri(iri, auth=SIG_AUTH)
logger.debug(f"fetch_iri({iri!r}) == {data!r}")
2018-07-11 18:04:48 +00:00
return data
2018-05-18 18:41:41 +00:00
def embed_collection(total_items, first_page_id):
2018-06-21 22:55:50 +00:00
"""Helper creating a root OrderedCollection with a link to the first page."""
2018-05-28 17:46:23 +00:00
return {
2018-06-16 19:24:53 +00:00
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"totalItems": total_items,
2018-06-16 19:24:53 +00:00
"first": f"{first_page_id}?page=first",
2018-06-01 18:29:44 +00:00
"id": first_page_id,
2018-05-28 17:46:23 +00:00
}
def simple_build_ordered_collection(col_name, data):
return {
"@context": DEFAULT_CTX,
"id": BASE_URL + "/" + col_name,
"totalItems": len(data),
"type": ap.ActivityType.ORDERED_COLLECTION.value,
2018-07-23 19:43:03 +00:00
"orderedItems": data,
}
2018-06-16 19:24:53 +00:00
def build_ordered_collection(
col, q=None, cursor=None, map_func=None, limit=50, col_name=None, first_page=False
):
2018-06-21 22:55:50 +00:00
"""Helper for building an OrderedCollection from a MongoDB query (with pagination support)."""
2018-05-18 18:41:41 +00:00
col_name = col_name or col.name
if q is None:
q = {}
if cursor:
2018-06-16 19:24:53 +00:00
q["_id"] = {"$lt": ObjectId(cursor)}
data = list(col.find(q, limit=limit).sort("_id", -1))
2018-05-18 18:41:41 +00:00
if not data:
# Returns an empty page if there's a cursor
if cursor:
return {
2019-08-24 10:09:45 +00:00
"@context": DEFAULT_CTX,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
"id": BASE_URL + "/" + col_name + "?cursor=" + cursor,
"partOf": BASE_URL + "/" + col_name,
"totalItems": 0,
"orderedItems": [],
}
2018-05-18 18:41:41 +00:00
return {
2019-08-24 10:09:45 +00:00
"@context": DEFAULT_CTX,
2018-06-16 19:24:53 +00:00
"id": BASE_URL + "/" + col_name,
"totalItems": 0,
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"orderedItems": [],
2018-05-18 18:41:41 +00:00
}
2018-06-16 19:24:53 +00:00
start_cursor = str(data[0]["_id"])
next_page_cursor = str(data[-1]["_id"])
2018-05-18 18:41:41 +00:00
total_items = col.find(q).count()
data = [_remove_id(doc) for doc in data]
if map_func:
data = [map_func(doc) for doc in data]
2018-06-04 17:13:04 +00:00
2018-05-18 18:41:41 +00:00
# No cursor, this is the first page and we return an OrderedCollection
if not cursor:
resp = {
2019-08-24 10:09:45 +00:00
"@context": DEFAULT_CTX,
2018-06-16 19:24:53 +00:00
"id": f"{BASE_URL}/{col_name}",
"totalItems": total_items,
"type": ap.ActivityType.ORDERED_COLLECTION.value,
"first": {
"id": f"{BASE_URL}/{col_name}?cursor={start_cursor}",
"orderedItems": data,
"partOf": f"{BASE_URL}/{col_name}",
"totalItems": total_items,
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
},
2018-05-18 18:41:41 +00:00
}
if len(data) == limit:
2018-06-16 19:24:53 +00:00
resp["first"]["next"] = (
BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor
)
2018-05-18 18:41:41 +00:00
2018-06-01 18:29:44 +00:00
if first_page:
2018-06-16 19:24:53 +00:00
return resp["first"]
2018-06-01 18:29:44 +00:00
2018-05-18 18:41:41 +00:00
return resp
# If there's a cursor, then we return an OrderedCollectionPage
resp = {
2019-08-24 10:09:45 +00:00
"@context": DEFAULT_CTX,
2018-06-16 19:24:53 +00:00
"type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value,
"id": BASE_URL + "/" + col_name + "?cursor=" + start_cursor,
"totalItems": total_items,
"partOf": BASE_URL + "/" + col_name,
"orderedItems": data,
2018-05-18 18:41:41 +00:00
}
if len(data) == limit:
2018-06-16 19:24:53 +00:00
resp["next"] = BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor
2018-05-18 18:41:41 +00:00
2018-06-01 18:29:44 +00:00
if first_page:
2018-06-16 19:24:53 +00:00
return resp["first"]
2018-06-01 18:29:44 +00:00
# XXX(tsileo): implements prev with prev=<first item cursor>?
2018-05-18 18:41:41 +00:00
return resp
def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None:
activity = raw_doc["activity"]
if (
ap._has_type(activity["type"], ap.ActivityType.CREATE)
and "object" in activity
and ap._has_type(activity["object"]["type"], ap.ActivityType.QUESTION)
):
for choice in activity["object"].get("oneOf", activity["object"].get("anyOf")):
choice["replies"] = {
"type": ap.ActivityType.COLLECTION.value,
"totalItems": raw_doc["meta"]
.get("question_answers", {})
.get(_answer_key(choice["name"]), 0),
}
now = datetime.now(timezone.utc)
if format_datetime(now) >= activity["object"]["endTime"]:
activity["object"]["closed"] = activity["object"]["endTime"]
def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]:
2019-08-16 19:16:25 +00:00
if not ap._has_type(raw_doc["activity"]["type"], ap.ActivityType.CREATE.value):
return raw_doc
raw_doc["activity"]["object"]["replies"] = embed_collection(
2019-08-16 19:16:25 +00:00
raw_doc.get("meta", {}).get(MetaKey.COUNT_REPLY.value, 0),
f'{raw_doc["remote_id"]}/replies',
)
raw_doc["activity"]["object"]["likes"] = embed_collection(
2019-08-16 19:16:25 +00:00
raw_doc.get("meta", {}).get(MetaKey.COUNT_LIKE.value, 0),
f'{raw_doc["remote_id"]}/likes',
)
raw_doc["activity"]["object"]["shares"] = embed_collection(
2019-08-16 19:16:25 +00:00
raw_doc.get("meta", {}).get(MetaKey.COUNT_BOOST.value, 0),
f'{raw_doc["remote_id"]}/shares',
)
return raw_doc
def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]:
if "@context" in activity:
del activity["@context"]
return activity
def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, Any]:
raw_doc = add_extra_collection(raw_doc)
activity = clean_activity(raw_doc["activity"])
# Handle Questions
# TODO(tsileo): what about object embedded by ID/URL?
_add_answers_to_question(raw_doc)
if embed:
return remove_context(activity)
return activity
2019-08-11 09:41:16 +00:00
def _cache_actor_icon(actor: ap.BaseActivity) -> None:
if actor.icon:
if isinstance(actor.icon, dict) and "url" in actor.icon:
2019-08-11 10:07:30 +00:00
Tasks.cache_actor_icon(actor.icon["url"], actor.id)
2019-08-11 09:41:16 +00:00
else:
logger.warning(f"failed to parse icon {actor.icon} for {actor!r}")
def update_cached_actor(actor: ap.BaseActivity) -> None:
actor_hash = _actor_hash(actor)
update_many_activities(
{
**flag(MetaKey.ACTOR_ID, actor.id),
**flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{MetaKey.ACTOR: actor.to_dict(embed=True), MetaKey.ACTOR_HASH: actor_hash}
),
)
update_many_activities(
{
**flag(MetaKey.OBJECT_ACTOR_ID, actor.id),
**flag(MetaKey.OBJECT_ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{
MetaKey.OBJECT_ACTOR: actor.to_dict(embed=True),
MetaKey.OBJECT_ACTOR_HASH: actor_hash,
}
),
)
2019-08-18 07:40:43 +00:00
DB.replies.update_many(
{
**flag(MetaKey.ACTOR_ID, actor.id),
**flag(MetaKey.ACTOR_HASH, {"$ne": actor_hash}),
},
upsert(
{MetaKey.ACTOR: actor.to_dict(embed=True), MetaKey.ACTOR_HASH: actor_hash}
),
)
2019-08-11 13:36:44 +00:00
# TODO(tsileo): Also update following (it's in the object)
# DB.activities.update_many(
# {"meta.object_id": actor.id}, {"$set": {"meta.object": actor.to_dict(embed=True)}}
# )
2019-08-11 10:07:30 +00:00
_cache_actor_icon(actor)
2019-08-20 20:16:47 +00:00
Tasks.cache_emojis(actor)
2019-08-18 10:39:19 +00:00
def handle_question_reply(create: ap.Create, question: ap.Question) -> None:
choice = create.get_object().name
# Ensure it's a valid choice
if choice not in [c["name"] for c in question._data.get("oneOf", question.anyOf)]:
logger.info("invalid choice")
return
# Hash the choice/answer (so we can use it as a key)
answer_key = _answer_key(choice)
is_single_choice = bool(question._data.get("oneOf", []))
dup_query = {
"activity.object.actor": create.get_actor().id,
"meta.answer_to": question.id,
**({} if is_single_choice else {"meta.poll_answer_choice": choice}),
}
print(f"dup_q={dup_query}")
# Check for duplicate votes
if DB.activities.find_one(dup_query):
logger.info("duplicate response")
return
# Update the DB
DB.activities.update_one(
{**by_object_id(question.id), **by_type(ap.ActivityType.CREATE)},
{
"$inc": {
"meta.question_replies": 1,
f"meta.question_answers.{answer_key}": 1,
}
},
)
DB.activities.update_one(
by_remote_id(create.id),
{
"$set": {
2019-08-25 09:09:34 +00:00
"meta.poll_answer_to": question.id,
2019-08-18 10:39:19 +00:00
"meta.poll_answer_choice": choice,
"meta.stream": False,
"meta.poll_answer": True,
}
},
)
return None
def handle_replies(create: ap.Create) -> None:
"""Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent"
key to make it easy to query a whole thread."""
in_reply_to = create.get_object().get_in_reply_to()
if not in_reply_to:
return
reply = ap.fetch_remote_activity(in_reply_to)
if reply.has_type(ap.ActivityType.CREATE):
reply = reply.get_object()
# FIXME(tsileo): can be a 403 too, in this case what to do? not error at least
# Ensure the this is a local reply, of a question, with a direct "to" addressing
if (
reply.id.startswith(BASE_URL)
and reply.has_type(ap.ActivityType.QUESTION.value)
and _is_local_reply(create)
and not create.is_public()
):
return handle_question_reply(create, reply)
elif (
create.id.startswith(BASE_URL)
and reply.has_type(ap.ActivityType.QUESTION.value)
and not create.is_public()
):
# Keep track of our own votes
DB.activities.update_one(
{"activity.object.id": reply.id, "box": "inbox"},
{
"$set": {
f"meta.poll_answers_sent.{_answer_key(create.get_object().name)}": True
}
},
)
2019-08-25 09:09:34 +00:00
# Mark our reply as a poll answers, to "hide" it from the UI
update_one_activity(
by_remote_id(create.id),
upsert({MetaKey.POLL_ANSWER: True, MetaKey.POLL_ANSWER_TO: reply.id}),
)
2019-08-18 10:39:19 +00:00
return None
2019-09-05 21:16:11 +00:00
in_reply_to_data = {MetaKey.IN_REPLY_TO: in_reply_to}
2019-08-25 10:48:04 +00:00
# Update the activity to save some data about the reply
if reply.get_actor().id == create.get_actor().id:
2019-09-03 20:24:30 +00:00
in_reply_to_data.update({MetaKey.IN_REPLY_TO_SELF: True})
2019-08-25 10:48:04 +00:00
else:
2019-09-03 20:24:30 +00:00
in_reply_to_data.update(
{MetaKey.IN_REPLY_TO_ACTOR: reply.get_actor().to_dict(embed=True)}
)
2019-08-25 10:48:04 +00:00
update_one_activity(by_remote_id(create.id), upsert(in_reply_to_data))
2019-08-18 10:39:19 +00:00
# It's a regular reply, try to increment the reply counter
creply = DB.activities.find_one_and_update(
{**by_object_id(in_reply_to), **by_type(ap.ActivityType.CREATE)},
inc(MetaKey.COUNT_REPLY, 1),
)
if not creply:
# Maybe it's the reply of a reply?
DB.replies.find_one_and_update(
by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1)
)
# Spawn a task to process it (and determine if it needs to be saved)
2019-08-25 10:48:04 +00:00
Tasks.process_reply(create.get_object_id())