diff --git a/activitypub.py b/activitypub.py index 7610a69..e4a64f3 100644 --- a/activitypub.py +++ b/activitypub.py @@ -107,7 +107,7 @@ class BaseActivity(object): # Initialize the object self._data: Dict[str, Any] = {'type': self.ACTIVITY_TYPE.value} - logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity') + logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity: {kwargs}') if 'id' in kwargs: self._data['id'] = kwargs.pop('id') @@ -687,15 +687,22 @@ class Delete(BaseActivity): ACTIVITY_TYPE = ActivityType.DELETE ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.TOMBSTONE] - def _recipients(self) -> List[str]: + def _get_actual_object(self) -> BaseActivity: obj = self.get_object() if obj.type_enum == ActivityType.TOMBSTONE: obj = parse_activity(OBJECT_SERVICE.get(obj.id)) + return obj + + def _recipients(self) -> List[str]: + obj = self._get_actual_object() return obj._recipients() def _process_from_inbox(self) -> None: DB.inbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}}) - # TODO(tsileo): also delete copies stored in parents' `meta.replies` + obj = self._get_actual_object() + if obj.type_enum == ActivityType.NOTE: + obj._delete_from_threads() + # TODO(tsileo): also purge the cache if it's a reply of a published activity def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: @@ -773,36 +780,60 @@ class Create(BaseActivity): return recipients - def _process_from_inbox(self): + def _update_threads(self) -> None: + logger.debug('_update_threads hook') obj = self.get_object() - tasks.fetch_og.delay('INBOX', self.id) + # TODO(tsileo): re-enable me + # tasks.fetch_og.delay('INBOX', self.id) - in_reply_to = obj.inReplyTo - if in_reply_to: - parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) - if not parent: - DB.outbox.update_one( - {'activity.object.id': in_reply_to}, - {'$inc': {'meta.count_reply': 1}}, - ) - return + threads = [] + reply = obj.get_local_reply() + logger.debug(f'initial_reply={reply}') + reply_id = None + direct_reply = 1 + while reply is not None: + if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, { + '$inc': { + 'meta.count_reply': 1, + 'meta.count_direct_reply': direct_reply, + }, + }): + DB.outbox.update_one({'activity.object.id': reply.id}, { + '$inc': { + 'meta.count_reply': 1, + 'meta.count_direct_reply': direct_reply, + }, + }) - # If the note is a "reply of a reply" update the parent message - # TODO(tsileo): review this code - while parent: - DB.inbox.update_one({'_id': parent['_id']}, {'$push': {'meta.replies': self.to_dict()}}) - in_reply_to = parent.get('activity', {}).get('object', {}).get('inReplyTo') - if in_reply_to: - parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) - if parent is None: - # The reply is a note from the outbox - DB.outbox.update_one( - {'activity.object.id': in_reply_to}, - {'$inc': {'meta.count_reply': 1}}, - ) - else: - parent = None + direct_reply = 0 + reply_id = reply.id + reply = reply.get_local_reply() + logger.debug(f'next_reply={reply}') + if reply: + # Only append to threads if it's not the root + threads.append(reply_id) + + if reply_id: + if not DB.inbox.find_one_and_update({'activity.object.id': obj.id}, { + '$set': { + 'meta.thread_parents': threads, + 'meta.thread_root_parent': reply_id, + }, + }): + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$set': { + 'meta.thread_parents': threads, + 'meta.thread_root_parent': reply_id, + }, + }) + logger.debug('_update_threads done') + + def _process_from_inbox(self) -> None: + self._update_threads() + + def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: + self._update_threads() def _should_purge_cache(self) -> bool: # TODO(tsileo): handle reply of a reply... @@ -828,17 +859,9 @@ class Note(BaseActivity): # Remove the `actor` field as `attributedTo` is used for `Note` instead if 'actor' in self._data: del(self._data['actor']) - # FIXME(tsileo): use kwarg - # TODO(tsileo): support mention tag - # TODO(tisleo): implement the tag endpoint if 'sensitive' not in kwargs: self._data['sensitive'] = False - # FIXME(tsileo): add the tag in CC - # for t in kwargs.get('tag', []): - # if t['type'] == 'Mention': - # cc -> c['href'] - def _recipients(self) -> List[str]: # TODO(tsileo): audience support? recipients: List[str] = [] @@ -855,6 +878,51 @@ class Note(BaseActivity): return recipients + def _delete_from_threads(self) -> None: + logger.debug('_delete_from_threads hook') + + reply = self.get_local_reply() + logger.debug(f'initial_reply={reply}') + direct_reply = -1 + while reply is not None: + if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, { + '$inc': { + 'meta.count_reply': -1, + 'meta.count_direct_reply': direct_reply, + }, + }): + DB.outbox.update_one({'activity.object.id': reply.id}, { + '$inc': { + 'meta.count_reply': 1, + 'meta.count_direct_reply': direct_reply, + }, + }) + + direct_reply = 0 + reply = reply.get_local_reply() + logger.debug(f'next_reply={reply}') + + logger.debug('_delete_from_threads done') + return None + + def get_local_reply(self) -> Optional[BaseActivity]: + "Find the note reply if any.""" + in_reply_to = self.inReplyTo + if not in_reply_to: + # This is the root comment + return None + + inbox_parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) + if inbox_parent: + return parse_activity(inbox_parent['activity']['object']) + + outbox_parent = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) + if outbox_parent: + return parse_activity(outbox_parent['activity']['object']) + + # The parent is no stored on this instance + return None + def build_create(self) -> BaseActivity: """Wraps an activity in a Create activity.""" create_payload = { @@ -872,10 +940,10 @@ class Note(BaseActivity): def build_announce(self) -> BaseActivity: return Announce( - object=self.id, - to=[AS_PUBLIC], - cc=[ID+'/followers', self.attributedTo], - published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z', + object=self.id, + to=[AS_PUBLIC], + cc=[ID+'/followers', self.attributedTo], + published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z', ) def build_delete(self) -> BaseActivity: diff --git a/app.py b/app.py index 9d86289..3d15b95 100644 --- a/app.py +++ b/app.py @@ -30,6 +30,7 @@ from passlib.hash import bcrypt from u2flib_server import u2f from urllib.parse import urlparse, urlencode from werkzeug.utils import secure_filename +from flask_wtf.csrf import CSRFProtect import activitypub import config @@ -57,10 +58,17 @@ from utils.key import get_secret_key from utils.webfinger import get_remote_follow_template from utils.webfinger import get_actor_url + + + from typing import Dict, Any app = Flask(__name__) app.secret_key = get_secret_key('flask') +app.config.update( + WTF_CSRF_CHECK_DEFAULT=False, +) +csrf = CSRFProtect(app) logger = logging.getLogger(__name__) @@ -441,15 +449,21 @@ def webfinger(): def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]: + if raw_doc['activity']['type'] != ActivityType.CREATE.value: + return raw_doc + if 'col_likes' in raw_doc.get('meta', {}): col_likes = raw_doc['meta']['col_likes'] - if raw_doc['activity']['type'] == ActivityType.CREATE.value: - raw_doc['activity']['object']['likes'] = embed_collection(col_likes) + raw_doc['activity']['object']['likes'] = embed_collection(col_likes) + if 'col_shares' in raw_doc.get('meta', {}): col_shares = raw_doc['meta']['col_shares'] - if raw_doc['activity']['type'] == ActivityType.CREATE.value: - raw_doc['activity']['object']['shares'] = embed_collection(col_shares) + raw_doc['activity']['object']['shares'] = embed_collection(col_shares) + if 'count_direct_reply' in raw_doc.get('meta', {}): + # FIXME(tsileo): implements the collection handler + raw_doc['activity']['object']['replies'] = {'type': 'Collection', 'totalItems': raw_doc['meta']['count_direct_reply']} + return raw_doc diff --git a/requirements.txt b/requirements.txt index 3e770ad..425405f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ requests markdown python-u2flib-server Flask +Flask-WTF Celery pymongo pyld diff --git a/tests/federation_test.py b/tests/federation_test.py index a5bd1cf..d65c35d 100644 --- a/tests/federation_test.py +++ b/tests/federation_test.py @@ -297,6 +297,7 @@ def test_post_content_and_like(): assert len(note['likes']['items']) == 1 assert note['likes']['items'][0]['id'] == like_id + def test_post_content_and_like_unlike(): instance1, instance2 = _instances() # Instance1 follows instance2 @@ -332,6 +333,7 @@ def test_post_content_and_like_unlike(): assert 'likes' in note assert len(note['likes']['items']) == 0 + def test_post_content_and_boost(): instance1, instance2 = _instances() # Instance1 follows instance2 @@ -426,8 +428,54 @@ def test_post_content_and_post_reply(): assert len(instance1_inbox_stream['items']) == 1 assert instance1_inbox_stream['items'][0]['id'] == instance2_create_id - # TODO(tsileo): find the activity and check the `replies` collection + instance1_note = instance1.outbox_get(f'{instance1_create_id}/activity') + assert 'replies' in instance1_note + assert instance1_note['replies']['totalItems'] == 1 + # TODO(tsileo): inspect the `replies` collection -# TODO(tsileo): -# def test_post_content_and_post_reply_and_delete(): +def test_post_content_and_post_reply_and_delete(): + instance1, instance2 = _instances() + # Instance1 follows instance2 + instance1.follow(instance2) + instance2.follow(instance1) + + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 0 + + instance1_create_id = instance1.new_note('hello') + instance2_debug = instance2.debug() + assert instance2_debug['inbox'] == 3 # An Follow, Accept and Create activity should be there + assert instance2_debug['outbox'] == 2 # We've sent a Accept and a Follow activity + + # Ensure the post is visible in instance2's stream + instance2_inbox_stream = instance2.stream_jsonfeed() + assert len(instance2_inbox_stream['items']) == 1 + assert instance2_inbox_stream['items'][0]['id'] == instance1_create_id + + instance2_create_id = instance2.new_note(f'hey @instance1@{instance1.docker_url}', reply=f'{instance1_create_id}/activity') + instance2_debug = instance2.debug() + assert instance2_debug['inbox'] == 3 # An Follow, Accept and Create activity should be there + assert instance2_debug['outbox'] == 3 # We've sent a Accept and a Follow and a Create activity + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 3 # An Follow, Accept and Create activity should be there + assert instance1_debug['outbox'] == 3 # We've sent a Accept and a Follow and a Create activity + + instance1_inbox_stream = instance1.stream_jsonfeed() + assert len(instance1_inbox_stream['items']) == 1 + assert instance1_inbox_stream['items'][0]['id'] == instance2_create_id + + instance1_note = instance1.outbox_get(f'{instance1_create_id}/activity') + assert 'replies' in instance1_note + assert instance1_note['replies']['totalItems'] == 1 + + instance2.delete(f'{instance2_create_id}/activity') + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 4 # An Follow, Accept and Create and Delete activity should be there + assert instance1_debug['outbox'] == 3 # We've sent a Accept and a Follow and a Create activity + + instance1_note = instance1.outbox_get(f'{instance1_create_id}/activity') + assert 'replies' in instance1_note + assert instance1_note['replies']['totalItems'] == 0