From 5da44298a81962ec7f42cbaf841e0133ec818f86 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 16 Aug 2019 14:42:15 +0200 Subject: [PATCH] Improve reply processing --- blueprints/tasks.py | 62 ++++++++++++++++++++++++++++++++++---------- core/activitypub.py | 46 ++++++++++++-------------------- core/meta.py | 1 + templates/utils.html | 2 +- 4 files changed, 67 insertions(+), 44 deletions(-) diff --git a/blueprints/tasks.py b/blueprints/tasks.py index 4ffff69..0070e09 100644 --- a/blueprints/tasks.py +++ b/blueprints/tasks.py @@ -21,6 +21,7 @@ from core.activitypub import SIG_AUTH from core.activitypub import Box from core.activitypub import _actor_hash from core.activitypub import _add_answers_to_question +from core.activitypub import _cache_actor_icon from core.activitypub import post_to_outbox from core.activitypub import save_reply from core.activitypub import update_cached_actor @@ -30,7 +31,9 @@ from core.inbox import process_inbox from core.meta import MetaKey from core.meta import by_object_id from core.meta import by_remote_id +from core.meta import by_type from core.meta import flag +from core.meta import inc from core.meta import upsert from core.notifications import set_inbox_flags from core.outbox import process_outbox @@ -250,7 +253,10 @@ def task_cache_attachments() -> _Response: app.logger.info(f"caching attachment for activity={activity!r}") # Generates thumbnails for the actor's icon and the attachments if any - obj = activity.get_object() + if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]): + obj = activity.get_object() + else: + obj = activity if obj.has_type(ap.ActivityType.VIDEO): if isinstance(obj.url, list): @@ -515,37 +521,67 @@ def task_process_reply() -> _Response: app.logger.info(f"activity={activity!r} is not a reply, dropping it") return "" - # new_threads = [] root_reply = in_reply_to - reply = ap.fetch_remote_activity(root_reply) + + reply = ap.fetch_remote_activity(in_reply_to) if reply.has_type(ap.ActivityType.CREATE): reply = reply.get_object() - while reply is not None: + new_replies = [activity, reply] + + while 1: in_reply_to = reply.get_in_reply_to() if not in_reply_to: break + root_reply = in_reply_to reply = ap.fetch_remote_activity(root_reply) + if reply.has_type(ap.ActivityType.CREATE): reply = reply.get_object() + new_replies.append(reply) + app.logger.info(f"root_reply={reply!r} for activity={activity!r}") # Ensure the "root reply" is present in the inbox/outbox if not find_one_activity(by_object_id(root_reply)): return "" - actor = activity.get_actor() + for new_reply in new_replies: + if find_one_activity(by_object_id(new_reply.id)) or DB.replies.find_one( + {"remote_id": root_reply} + ): + continue - save_reply( - activity, - { - "meta.thread_root_parent": root_reply, - **flag(MetaKey.ACTOR, actor.to_dict(embed=True)), - }, - ) - # FIXME(tsileo): cache actor here, spawn a task to cache attachment if needed + actor = new_reply.get_actor() + save_reply( + new_reply, + { + "meta.thread_root_parent": root_reply, + **flag(MetaKey.ACTOR, actor.to_dict(embed=True)), + **flag(MetaKey.ACTOR_HASH, _actor_hash(actor)), + }, + ) + + # Update the reply counters + if new_reply.get_in_reply_to(): + update_one_activity( + { + **by_object_id(new_reply.get_in_reply_to()), + **by_type(ap.ActivityType.CREATE), + }, + inc(MetaKey.COUNT_REPLY, 1), + ) + DB.replies.update_one( + by_remote_id(new_reply.get_in_reply_to()), + inc(MetaKey.COUNT_REPLY, 1), + ) + + # Cache the actor icon + _cache_actor_icon(actor) + # And cache the attachments + Tasks.cache_attachments(new_reply.id) except (ActivityGoneError, ActivityNotFoundError): app.logger.exception(f"dropping activity {iri}, skip processing") return "" diff --git a/core/activitypub.py b/core/activitypub.py index dc4f06a..1be68f7 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -34,9 +34,12 @@ from core.db import update_many_activities from core.meta import Box from core.meta import MetaKey from core.meta import by_object_id +from core.meta import by_remote_id from core.meta import flag +from core.meta import inc from core.meta import upsert from core.tasks import Tasks +from utils import now logger = logging.getLogger(__name__) @@ -187,6 +190,7 @@ def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None: if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: is_public = True + published = activity.published if activity.published else now() DB.replies.insert_one( { "activity": activity.to_dict(), @@ -199,6 +203,7 @@ def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None: "server": urlparse(activity.id).netloc, "visibility": visibility.name, "actor_id": activity.get_actor().id, + MetaKey.PUBLISHED.value: published, **meta, }, } @@ -467,10 +472,9 @@ class MicroblogPubBackend(Backend): if not in_reply_to: return - new_threads = [] - root_reply = in_reply_to - reply = ap.fetch_remote_activity(root_reply) - # FIXME(tsileo): can be a Create here (instead of a Note, Hubzilla's doing that) + reply = ap.fetch_remote_activity(in_reply_to) + if reply.has_type(ap.ActivityType.CREATE): + reply = reply.get_object() # FIXME(tsileo): can be a 403 too, in this case what to do? not error at least # Ensure the this is a local reply, of a question, with a direct "to" addressing @@ -497,35 +501,17 @@ class MicroblogPubBackend(Backend): ) return None + # It's a regular reply, try to increment the reply counter creply = DB.activities.find_one_and_update( - {"activity.object.id": in_reply_to}, - {"$inc": {"meta.count_reply": 1, "meta.count_direct_reply": 1}}, + by_object_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1) ) if not creply: - # It means the activity is not in the inbox, and not in the outbox, we want to save it - save(Box.REPLIES, reply) - new_threads.append(reply.id) - # TODO(tsileo): parses the replies collection and import the replies? - - while reply is not None: - in_reply_to = reply.get_in_reply_to() - if not in_reply_to: - break - root_reply = in_reply_to - reply = ap.fetch_remote_activity(root_reply) - # FIXME(tsileo): can be a Create here (instead of a Note, Hubzilla's doing that) - q = {"activity.object.id": root_reply} - if not DB.activities.count(q): - save(Box.REPLIES, reply) - new_threads.append(reply.id) - - DB.activities.update_one( - {"remote_id": create.id}, {"$set": {"meta.thread_root_parent": root_reply}} - ) - DB.activities.update( - {"box": Box.REPLIES.value, "remote_id": {"$in": new_threads}}, - {"$set": {"meta.thread_root_parent": root_reply}}, - ) + # Maybe it's the reply of a reply? + if not DB.replies.find_one_and_update( + by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1) + ): + # We don't have the reply stored, spawn a task to process it (and determine if it needs to be saved) + Tasks.process_reply(create.get_object().id) def embed_collection(total_items, first_page_id): diff --git a/core/meta.py b/core/meta.py index 7e9e762..0c99127 100644 --- a/core/meta.py +++ b/core/meta.py @@ -42,6 +42,7 @@ class MetaKey(Enum): COUNT_LIKE = "count_like" COUNT_BOOST = "count_boost" + COUNT_REPLY = "count_reply" def _meta(mk: MetaKey) -> str: diff --git a/templates/utils.html b/templates/utils.html index 4c4dc26..d6d298a 100644 --- a/templates/utils.html +++ b/templates/utils.html @@ -170,7 +170,7 @@ {% if request.path == url_for("admin.admin_lookup") %} {% endif %} - + {% endif %}