Improve reply processing

This commit is contained in:
Thomas Sileo 2019-08-16 14:42:15 +02:00
parent 558fb9c310
commit 5da44298a8
4 changed files with 67 additions and 44 deletions

View file

@ -21,6 +21,7 @@ from core.activitypub import SIG_AUTH
from core.activitypub import Box from core.activitypub import Box
from core.activitypub import _actor_hash from core.activitypub import _actor_hash
from core.activitypub import _add_answers_to_question from core.activitypub import _add_answers_to_question
from core.activitypub import _cache_actor_icon
from core.activitypub import post_to_outbox from core.activitypub import post_to_outbox
from core.activitypub import save_reply from core.activitypub import save_reply
from core.activitypub import update_cached_actor from core.activitypub import update_cached_actor
@ -30,7 +31,9 @@ from core.inbox import process_inbox
from core.meta import MetaKey from core.meta import MetaKey
from core.meta import by_object_id from core.meta import by_object_id
from core.meta import by_remote_id from core.meta import by_remote_id
from core.meta import by_type
from core.meta import flag from core.meta import flag
from core.meta import inc
from core.meta import upsert from core.meta import upsert
from core.notifications import set_inbox_flags from core.notifications import set_inbox_flags
from core.outbox import process_outbox from core.outbox import process_outbox
@ -250,7 +253,10 @@ def task_cache_attachments() -> _Response:
app.logger.info(f"caching attachment for activity={activity!r}") app.logger.info(f"caching attachment for activity={activity!r}")
# Generates thumbnails for the actor's icon and the attachments if any # Generates thumbnails for the actor's icon and the attachments if any
if activity.has_type([ap.ActivityType.CREATE, ap.ActivityType.ANNOUNCE]):
obj = activity.get_object() obj = activity.get_object()
else:
obj = activity
if obj.has_type(ap.ActivityType.VIDEO): if obj.has_type(ap.ActivityType.VIDEO):
if isinstance(obj.url, list): if isinstance(obj.url, list):
@ -515,37 +521,67 @@ def task_process_reply() -> _Response:
app.logger.info(f"activity={activity!r} is not a reply, dropping it") app.logger.info(f"activity={activity!r} is not a reply, dropping it")
return "" return ""
# new_threads = []
root_reply = in_reply_to root_reply = in_reply_to
reply = ap.fetch_remote_activity(root_reply)
reply = ap.fetch_remote_activity(in_reply_to)
if reply.has_type(ap.ActivityType.CREATE): if reply.has_type(ap.ActivityType.CREATE):
reply = reply.get_object() reply = reply.get_object()
while reply is not None: new_replies = [activity, reply]
while 1:
in_reply_to = reply.get_in_reply_to() in_reply_to = reply.get_in_reply_to()
if not in_reply_to: if not in_reply_to:
break break
root_reply = in_reply_to root_reply = in_reply_to
reply = ap.fetch_remote_activity(root_reply) reply = ap.fetch_remote_activity(root_reply)
if reply.has_type(ap.ActivityType.CREATE): if reply.has_type(ap.ActivityType.CREATE):
reply = reply.get_object() reply = reply.get_object()
new_replies.append(reply)
app.logger.info(f"root_reply={reply!r} for activity={activity!r}") app.logger.info(f"root_reply={reply!r} for activity={activity!r}")
# Ensure the "root reply" is present in the inbox/outbox # Ensure the "root reply" is present in the inbox/outbox
if not find_one_activity(by_object_id(root_reply)): if not find_one_activity(by_object_id(root_reply)):
return "" return ""
actor = activity.get_actor() for new_reply in new_replies:
if find_one_activity(by_object_id(new_reply.id)) or DB.replies.find_one(
{"remote_id": root_reply}
):
continue
actor = new_reply.get_actor()
save_reply( save_reply(
activity, new_reply,
{ {
"meta.thread_root_parent": root_reply, "meta.thread_root_parent": root_reply,
**flag(MetaKey.ACTOR, actor.to_dict(embed=True)), **flag(MetaKey.ACTOR, actor.to_dict(embed=True)),
**flag(MetaKey.ACTOR_HASH, _actor_hash(actor)),
}, },
) )
# FIXME(tsileo): cache actor here, spawn a task to cache attachment if needed
# Update the reply counters
if new_reply.get_in_reply_to():
update_one_activity(
{
**by_object_id(new_reply.get_in_reply_to()),
**by_type(ap.ActivityType.CREATE),
},
inc(MetaKey.COUNT_REPLY, 1),
)
DB.replies.update_one(
by_remote_id(new_reply.get_in_reply_to()),
inc(MetaKey.COUNT_REPLY, 1),
)
# Cache the actor icon
_cache_actor_icon(actor)
# And cache the attachments
Tasks.cache_attachments(new_reply.id)
except (ActivityGoneError, ActivityNotFoundError): except (ActivityGoneError, ActivityNotFoundError):
app.logger.exception(f"dropping activity {iri}, skip processing") app.logger.exception(f"dropping activity {iri}, skip processing")
return "" return ""

View file

@ -34,9 +34,12 @@ from core.db import update_many_activities
from core.meta import Box from core.meta import Box
from core.meta import MetaKey from core.meta import MetaKey
from core.meta import by_object_id from core.meta import by_object_id
from core.meta import by_remote_id
from core.meta import flag from core.meta import flag
from core.meta import inc
from core.meta import upsert from core.meta import upsert
from core.tasks import Tasks from core.tasks import Tasks
from utils import now
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -187,6 +190,7 @@ def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None:
if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]: if visibility in [ap.Visibility.PUBLIC, ap.Visibility.UNLISTED]:
is_public = True is_public = True
published = activity.published if activity.published else now()
DB.replies.insert_one( DB.replies.insert_one(
{ {
"activity": activity.to_dict(), "activity": activity.to_dict(),
@ -199,6 +203,7 @@ def save_reply(activity: ap.BaseActivity, meta: Dict[str, Any] = {}) -> None:
"server": urlparse(activity.id).netloc, "server": urlparse(activity.id).netloc,
"visibility": visibility.name, "visibility": visibility.name,
"actor_id": activity.get_actor().id, "actor_id": activity.get_actor().id,
MetaKey.PUBLISHED.value: published,
**meta, **meta,
}, },
} }
@ -467,10 +472,9 @@ class MicroblogPubBackend(Backend):
if not in_reply_to: if not in_reply_to:
return return
new_threads = [] reply = ap.fetch_remote_activity(in_reply_to)
root_reply = in_reply_to if reply.has_type(ap.ActivityType.CREATE):
reply = ap.fetch_remote_activity(root_reply) reply = reply.get_object()
# FIXME(tsileo): can be a Create here (instead of a Note, Hubzilla's doing that)
# FIXME(tsileo): can be a 403 too, in this case what to do? not error at least # FIXME(tsileo): can be a 403 too, in this case what to do? not error at least
# Ensure the this is a local reply, of a question, with a direct "to" addressing # Ensure the this is a local reply, of a question, with a direct "to" addressing
@ -497,35 +501,17 @@ class MicroblogPubBackend(Backend):
) )
return None return None
# It's a regular reply, try to increment the reply counter
creply = DB.activities.find_one_and_update( creply = DB.activities.find_one_and_update(
{"activity.object.id": in_reply_to}, by_object_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1)
{"$inc": {"meta.count_reply": 1, "meta.count_direct_reply": 1}},
) )
if not creply: if not creply:
# It means the activity is not in the inbox, and not in the outbox, we want to save it # Maybe it's the reply of a reply?
save(Box.REPLIES, reply) if not DB.replies.find_one_and_update(
new_threads.append(reply.id) by_remote_id(in_reply_to), inc(MetaKey.COUNT_REPLY, 1)
# TODO(tsileo): parses the replies collection and import the replies? ):
# We don't have the reply stored, spawn a task to process it (and determine if it needs to be saved)
while reply is not None: Tasks.process_reply(create.get_object().id)
in_reply_to = reply.get_in_reply_to()
if not in_reply_to:
break
root_reply = in_reply_to
reply = ap.fetch_remote_activity(root_reply)
# FIXME(tsileo): can be a Create here (instead of a Note, Hubzilla's doing that)
q = {"activity.object.id": root_reply}
if not DB.activities.count(q):
save(Box.REPLIES, reply)
new_threads.append(reply.id)
DB.activities.update_one(
{"remote_id": create.id}, {"$set": {"meta.thread_root_parent": root_reply}}
)
DB.activities.update(
{"box": Box.REPLIES.value, "remote_id": {"$in": new_threads}},
{"$set": {"meta.thread_root_parent": root_reply}},
)
def embed_collection(total_items, first_page_id): def embed_collection(total_items, first_page_id):

View file

@ -42,6 +42,7 @@ class MetaKey(Enum):
COUNT_LIKE = "count_like" COUNT_LIKE = "count_like"
COUNT_BOOST = "count_boost" COUNT_BOOST = "count_boost"
COUNT_REPLY = "count_reply"
def _meta(mk: MetaKey) -> str: def _meta(mk: MetaKey) -> str:

View file

@ -170,7 +170,7 @@
{% if request.path == url_for("admin.admin_lookup") %} {% if request.path == url_for("admin.admin_lookup") %}
<input type="hidden" name="url" value="{{obj.id}}"> <input type="hidden" name="url" value="{{obj.id}}">
{% endif %} {% endif %}
<button type="submit" class="bar-item-reverse">hide sensitive content</button> <button type="submit" class="bar-item">hide sensitive content</button>
</form> </form>
</div> </div>
{% endif %} {% endif %}