microblog.pub/core/notifications.py
2019-08-15 17:30:40 +02:00

212 lines
6.6 KiB
Python

import logging
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from functools import singledispatch
from typing import Any
from typing import Dict
from urllib.parse import urlparse
from little_boxes import activitypub as ap
from config import BASE_URL
from config import DB
from core.db import find_one_activity
from core.meta import MetaKey
from core.meta import _meta
from core.meta import by_actor
from core.meta import by_object_id
from core.meta import by_type
from core.meta import flag
from core.meta import in_inbox
from core.meta import not_undo
from core.meta import published_after
from core.tasks import Tasks
_logger = logging.getLogger(__name__)
_NewMeta = Dict[str, Any]
_LOCAL_NETLOC = urlparse(BASE_URL).netloc
def _is_from_outbox(activity: ap.BaseActivity) -> bool:
return activity.id.startswith(BASE_URL)
def _is_local(url: str) -> bool:
return urlparse(url).netloc == _LOCAL_NETLOC
def _flag_as_notification(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
new_meta.update(
{_meta(MetaKey.NOTIFICATION): True, _meta(MetaKey.NOTIFICATION_UNREAD): True}
)
return None
def _set_flag(meta: _NewMeta, meta_key: MetaKey, value: Any = True) -> None:
meta.update({_meta(meta_key): value})
return None
@singledispatch
def set_inbox_flags(activity: ap.BaseActivity, new_meta: _NewMeta) -> None:
_logger.warning(f"skipping {activity!r}")
return None
@set_inbox_flags.register
def _accept_set_inbox_flags(activity: ap.Accept, new_meta: _NewMeta) -> None:
"""Handle notifications for "accepted" following requests."""
_logger.info(f"set_inbox_flags activity={activity!r}")
# Check if this actor already follow us back
follows_back = False
follow_query = {
**in_inbox(),
**by_type(ap.ActivityType.FOLLOW),
**by_actor(activity.get_actor()),
**not_undo(),
}
raw_follow = DB.activities.find_one(follow_query)
if raw_follow:
follows_back = True
DB.activities.update_many(
follow_query, {"$set": {_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): True}}
)
# This Accept will be a "You started following $actor" notification
_flag_as_notification(activity, new_meta)
_set_flag(new_meta, MetaKey.GC_KEEP)
_set_flag(new_meta, MetaKey.NOTIFICATION_FOLLOWS_BACK, follows_back)
return None
@set_inbox_flags.register
def _follow_set_inbox_flags(activity: ap.Follow, new_meta: _NewMeta) -> None:
"""Handle notification for new followers."""
_logger.info(f"set_inbox_flags activity={activity!r}")
# Check if we're already following this actor
follows_back = False
accept_query = {
**in_inbox(),
**by_type(ap.ActivityType.ACCEPT),
**by_actor(activity.get_actor()),
**not_undo(),
}
raw_accept = DB.activities.find_one(accept_query)
if raw_accept:
follows_back = True
DB.activities.update_many(
accept_query, {"$set": {_meta(MetaKey.NOTIFICATION_FOLLOWS_BACK): True}}
)
# This Follow will be a "$actor started following you" notification
_flag_as_notification(activity, new_meta)
_set_flag(new_meta, MetaKey.GC_KEEP)
_set_flag(new_meta, MetaKey.NOTIFICATION_FOLLOWS_BACK, follows_back)
return None
@set_inbox_flags.register
def _like_set_inbox_flags(activity: ap.Like, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
# Is it a Like of local acitivty/from the outbox
if _is_from_outbox(activity.get_object()):
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Cache the object (for display on the notifcation page)
Tasks.cache_object(activity.id)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
return None
@set_inbox_flags.register
def _announce_set_inbox_flags(activity: ap.Announce, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
obj = activity.get_object()
# Is it a Annnounce/boost of local acitivty/from the outbox
if _is_from_outbox(obj):
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
# Dedup boosts (it's annoying to see the same note multipe times on the same page)
if not find_one_activity(
{
**in_inbox(),
**by_object_id(obj.id),
**flag(MetaKey.STREAM, True),
**published_after(datetime.now(timezone.utc) - timedelta(hours=12)),
}
):
# Display it in the stream only it not there already (only looking at the last 12 hours)
_set_flag(new_meta, MetaKey.STREAM)
return None
@set_inbox_flags.register
def _undo_set_inbox_flags(activity: ap.Undo, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
obj = activity.get_object()
if obj.has_type(ap.ActivityType.FOLLOW):
# Flag it as a noticiation (for the "$actor unfollowed you"
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
return None
@set_inbox_flags.register
def _create_set_inbox_flags(activity: ap.Create, new_meta: _NewMeta) -> None:
_logger.info(f"set_inbox_flags activity={activity!r}")
obj = activity.get_object()
_set_flag(new_meta, MetaKey.POLL_ANSWER, False)
in_reply_to = obj.get_in_reply_to()
# Check if it's a local reply
if in_reply_to and _is_local(in_reply_to):
# TODO(tsileo): fetch the reply to check for poll answers more precisely
# reply_of = ap.fetch_remote_activity(in_reply_to)
# Ensure it's not a poll answer
if obj.name and not obj.content:
_set_flag(new_meta, MetaKey.POLL_ANSWER)
return None
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
return None
# Check for mention
for mention in obj.get_mentions():
if mention.href and _is_local(mention.href):
# Flag it as a notification
_flag_as_notification(activity, new_meta)
# Also set the "keep mark" for the GC (as we want to keep it forever)
_set_flag(new_meta, MetaKey.GC_KEEP)
if not in_reply_to:
# A good candidate for displaying in the stream
_set_flag(new_meta, MetaKey.STREAM)
return None