From 388c024054bb700fc937b939b2905a05329ea52c Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Sun, 18 Aug 2019 12:39:19 +0200 Subject: [PATCH] Cleanup replies handling --- core/activitypub.py | 204 ++++++++++++++++++++++---------------------- core/inbox.py | 3 +- core/outbox.py | 4 +- 3 files changed, 106 insertions(+), 105 deletions(-) diff --git a/core/activitypub.py b/core/activitypub.py index 3e071f7..dd4e58e 100644 --- a/core/activitypub.py +++ b/core/activitypub.py @@ -424,108 +424,6 @@ class MicroblogPubBackend(Backend): {"$inc": {"meta.count_reply": -1, "meta.count_direct_reply": -1}}, ) - def _process_question_reply(self, create: ap.Create, question: ap.Question) -> None: - choice = create.get_object().name - - # Ensure it's a valid choice - if choice not in [ - c["name"] for c in question._data.get("oneOf", question.anyOf) - ]: - logger.info("invalid choice") - return - - # Hash the choice/answer (so we can use it as a key) - answer_key = _answer_key(choice) - - is_single_choice = bool(question._data.get("oneOf", [])) - dup_query = { - "activity.object.actor": create.get_actor().id, - "meta.answer_to": question.id, - **({} if is_single_choice else {"meta.poll_answer_choice": choice}), - } - - print(f"dup_q={dup_query}") - # Check for duplicate votes - if DB.activities.find_one(dup_query): - logger.info("duplicate response") - return - - # Update the DB - - DB.activities.update_one( - {"meta.object_id": question.id}, - { - "$inc": { - "meta.question_replies": 1, - f"meta.question_answers.{answer_key}": 1, - } - }, - ) - - DB.activities.update_one( - {"remote_id": create.id}, - { - "$set": { - "meta.answer_to": question.id, - "meta.poll_answer_choice": choice, - "meta.stream": False, - "meta.poll_answer": True, - } - }, - ) - - return None - - def _handle_replies(self, as_actor: ap.Person, create: ap.Create) -> None: - """Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent" - key to make it easy to query a whole thread.""" - in_reply_to = create.get_object().get_in_reply_to() - if not in_reply_to: - return - - reply = ap.fetch_remote_activity(in_reply_to) - if reply.has_type(ap.ActivityType.CREATE): - reply = reply.get_object() - # FIXME(tsileo): can be a 403 too, in this case what to do? not error at least - - # Ensure the this is a local reply, of a question, with a direct "to" addressing - if ( - reply.id.startswith(BASE_URL) - and reply.has_type(ap.ActivityType.QUESTION.value) - and _is_local_reply(create) - and not create.is_public() - ): - return self._process_question_reply(create, reply) - elif ( - create.id.startswith(BASE_URL) - and reply.has_type(ap.ActivityType.QUESTION.value) - and not create.is_public() - ): - # Keep track of our own votes - DB.activities.update_one( - {"activity.object.id": reply.id, "box": "inbox"}, - { - "$set": { - f"meta.poll_answers_sent.{_answer_key(create.get_object().name)}": True - } - }, - ) - return None - - # It's a regular reply, try to increment the reply counter - creply = DB.activities.find_one_and_update( - {**by_object_id(in_reply_to), **by_type(ap.ActivityType.CREATE)}, - inc(MetaKey.COUNT_REPLY, 1), - ) - if not creply: - # Maybe it's the reply of a reply? - DB.replies.find_one_and_update( - by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1) - ) - - # Spawn a task to process it (and determine if it needs to be saved) - Tasks.process_reply(create.get_object().id) - def embed_collection(total_items, first_page_id): """Helper creating a root OrderedCollection with a link to the first page.""" @@ -736,3 +634,105 @@ def update_cached_actor(actor: ap.BaseActivity) -> None: # {"meta.object_id": actor.id}, {"$set": {"meta.object": actor.to_dict(embed=True)}} # ) _cache_actor_icon(actor) + + +def handle_question_reply(create: ap.Create, question: ap.Question) -> None: + choice = create.get_object().name + + # Ensure it's a valid choice + if choice not in [c["name"] for c in question._data.get("oneOf", question.anyOf)]: + logger.info("invalid choice") + return + + # Hash the choice/answer (so we can use it as a key) + answer_key = _answer_key(choice) + + is_single_choice = bool(question._data.get("oneOf", [])) + dup_query = { + "activity.object.actor": create.get_actor().id, + "meta.answer_to": question.id, + **({} if is_single_choice else {"meta.poll_answer_choice": choice}), + } + + print(f"dup_q={dup_query}") + # Check for duplicate votes + if DB.activities.find_one(dup_query): + logger.info("duplicate response") + return + + # Update the DB + + DB.activities.update_one( + {**by_object_id(question.id), **by_type(ap.ActivityType.CREATE)}, + { + "$inc": { + "meta.question_replies": 1, + f"meta.question_answers.{answer_key}": 1, + } + }, + ) + + DB.activities.update_one( + by_remote_id(create.id), + { + "$set": { + "meta.answer_to": question.id, + "meta.poll_answer_choice": choice, + "meta.stream": False, + "meta.poll_answer": True, + } + }, + ) + + return None + + +def handle_replies(create: ap.Create) -> None: + """Go up to the root reply, store unknown replies in the `threads` DB and set the "meta.thread_root_parent" + key to make it easy to query a whole thread.""" + in_reply_to = create.get_object().get_in_reply_to() + if not in_reply_to: + return + + reply = ap.fetch_remote_activity(in_reply_to) + if reply.has_type(ap.ActivityType.CREATE): + reply = reply.get_object() + # FIXME(tsileo): can be a 403 too, in this case what to do? not error at least + + # Ensure the this is a local reply, of a question, with a direct "to" addressing + if ( + reply.id.startswith(BASE_URL) + and reply.has_type(ap.ActivityType.QUESTION.value) + and _is_local_reply(create) + and not create.is_public() + ): + return handle_question_reply(create, reply) + elif ( + create.id.startswith(BASE_URL) + and reply.has_type(ap.ActivityType.QUESTION.value) + and not create.is_public() + ): + # Keep track of our own votes + DB.activities.update_one( + {"activity.object.id": reply.id, "box": "inbox"}, + { + "$set": { + f"meta.poll_answers_sent.{_answer_key(create.get_object().name)}": True + } + }, + ) + return None + + # It's a regular reply, try to increment the reply counter + creply = DB.activities.find_one_and_update( + {**by_object_id(in_reply_to), **by_type(ap.ActivityType.CREATE)}, + inc(MetaKey.COUNT_REPLY, 1), + ) + if not creply: + # Maybe it's the reply of a reply? + DB.replies.find_one_and_update( + by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1) + ) + + # Spawn a task to process it (and determine if it needs to be saved) + Tasks.process_reply(create.get_object().id) diff --git a/core/inbox.py b/core/inbox.py index 18c13b1..0208e85 100644 --- a/core/inbox.py +++ b/core/inbox.py @@ -8,6 +8,7 @@ from little_boxes.errors import NotAnActivityError import config from core.activitypub import _answer_key +from core.activitypub import handle_replies from core.activitypub import post_to_outbox from core.activitypub import update_cached_actor from core.db import DB @@ -108,7 +109,7 @@ def _create_process_inbox(create: ap.Create, new_meta: _NewMeta) -> None: if question.has_type(ap.ActivityType.QUESTION): Tasks.fetch_remote_question(question) - back._handle_replies(MY_PERSON, create) + handle_replies(create) @process_inbox.register diff --git a/core/outbox.py b/core/outbox.py index a47d5b5..01e62c3 100644 --- a/core/outbox.py +++ b/core/outbox.py @@ -6,6 +6,7 @@ from typing import Dict from little_boxes import activitypub as ap +from core.activitypub import handle_replies from core.db import find_one_activity from core.db import update_many_activities from core.db import update_one_activity @@ -15,7 +16,6 @@ from core.meta import by_type from core.meta import inc from core.meta import upsert from core.shared import MY_PERSON -from core.shared import back from core.tasks import Tasks _logger = logging.getLogger(__name__) @@ -87,7 +87,7 @@ def _update_process_outbox(update: ap.Update, new_meta: _NewMeta) -> None: @process_outbox.register def _create_process_outbox(create: ap.Create, new_meta: _NewMeta) -> None: _logger.info(f"process_outbox activity={create!r}") - back._handle_replies(MY_PERSON, create) + handle_replies(create) @process_outbox.register