diff --git a/.dockerignore b/.dockerignore index 35249de..ccc0367 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,2 +1,3 @@ +__pycache__/ data/ tests/ diff --git a/activitypub.py b/activitypub.py index 3ce0441..252feca 100644 --- a/activitypub.py +++ b/activitypub.py @@ -251,16 +251,19 @@ class BaseActivity(object): self.__obj: BaseActivity = p return p - def _to_dict(self, data: ObjectType) -> ObjectType: - return data - - def to_dict(self, embed: bool = False) -> ObjectType: + def to_dict(self, embed: bool = False, embed_object_id_only: bool = False) -> ObjectType: data = dict(self._data) if embed: for k in ['@context', 'signature']: if k in data: del(data[k]) - return self._to_dict(data) + if data.get('object') and embed_object_id_only and isinstance(data['object'], dict): + try: + data['object'] = data['object']['id'] + except KeyError: + raise BadActivityError('embedded object does not have an id') + + return data def get_actor(self) -> 'BaseActivity': actor = self._data.get('actor') @@ -424,11 +427,6 @@ class Person(BaseActivity): def _verify(self) -> None: ACTOR_SERVICE.get(self._data['id']) - def _to_dict(self, data): - # if 'icon' in data: - # data['icon'] = data['icon'].to_dict() - return data - class Block(BaseActivity): ACTIVITY_TYPE = ActivityType.BLOCK @@ -568,12 +566,19 @@ class Like(BaseActivity): def _process_from_inbox(self): obj = self.get_object() # Update the meta counter if the object is published by the server - DB.outbox.update_one({'activity.object.id': obj.id}, {'$inc': {'meta.count_like': 1}}) + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$inc': {'meta.count_like': 1}, + '$addToSet': {'meta.col_likes': self.to_dict(embed=True, embed_object_id_only=True)}, + }) + # XXX(tsileo): notification?? def _undo_inbox(self) -> None: obj = self.get_object() # Update the meta counter if the object is published by the server - DB.outbox.update_one({'activity.object.id': obj.id}, {'$inc': {'meta.count_like': -1}}) + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$inc': {'meta.count_like': -1}, + '$pull': {'meta.col_likes': {'id': self.id}}, + }) def _undo_should_purge_cache(self) -> bool: # If a like coutn was decremented, we need to purge the application cache @@ -582,19 +587,26 @@ class Like(BaseActivity): def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]): obj = self.get_object() # Unlikely, but an actor can like it's own post - DB.outbox.update_one({'activity.object.id': obj.id}, {'$inc': {'meta.count_like': 1}}) + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$inc': {'meta.count_like': 1}, + '$addToSet': {'meta.col_likes': self.to_dict(embed=True, embed_object_id_only=True)}, + }) + # Keep track of the like we just performed DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': obj_id}}) def _undo_outbox(self) -> None: obj = self.get_object() # Unlikely, but an actor can like it's own post - DB.outbox.update_one({'activity.object.id': obj.id}, {'$inc': {'meta.count_like': -1}}) + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$inc': {'meta.count_like': -1}, + '$pull': {'meta.col_likes': {'id': self.id}}, + }) DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': False}}) def build_undo(self) -> BaseActivity: - return Undo(object=self.to_dict(embed=True)) + return Undo(object=self.to_dict(embed=True, embed_object_id_only=True)) class Announce(BaseActivity): @@ -613,7 +625,9 @@ class Announce(BaseActivity): def _process_from_inbox(self) -> None: if isinstance(self._data['object'], str) and not self._data['object'].startswith('http'): # TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else - print(f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message') + logger.warn( + f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message' + ) return # Save/cache the object, and make it part of the stream so we can fetch it if isinstance(self._data['object'], str): @@ -626,12 +640,18 @@ class Announce(BaseActivity): obj = parse_activity(raw_obj) else: obj = self.get_object() - DB.outbox.update_one({'activity.object.id': obj.id}, {'$inc': {'meta.count_boost': 1}}) + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$inc': {'meta.count_boost': 1}, + '$addToSet': {'meta.col_shares': self.to_dict(embed=True, embed_object_id_only=True)}, + }) def _undo_inbox(self) -> None: obj = self.get_object() - DB.inbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}}) - DB.outbox.update_one({'activity.object.id': obj.id}, {'$inc': {'meta.count_boost': -1}}) + # Update the meta counter if the object is published by the server + DB.outbox.update_one({'activity.object.id': obj.id}, { + '$inc': {'meta.count_boost': -1}, + '$pull': {'meta.col_shares': {'id': self.id}}, + }) def _undo_should_purge_cache(self) -> bool: # If a like coutn was decremented, we need to purge the application cache @@ -971,6 +991,14 @@ def parse_collection(payload: Optional[Dict[str, Any]] = None, url: Optional[str return activitypub_utils.parse_collection(payload, url) +def embed_collection(data): + return { + "type": "Collection", + "totalItems": len(data), + "items": data, + } + + def build_ordered_collection(col, q=None, cursor=None, map_func=None, limit=50, col_name=None): col_name = col_name or col.name if q is None: diff --git a/app.py b/app.py index 6b11aa0..a5f47e2 100644 --- a/app.py +++ b/app.py @@ -35,6 +35,7 @@ import activitypub import config from activitypub import ActivityType from activitypub import clean_activity +from activitypub import embed_collection from utils.content_helper import parse_markdown from config import KEY from config import DB @@ -56,10 +57,13 @@ from utils.key import get_secret_key from utils.webfinger import get_remote_follow_template from utils.webfinger import get_actor_url +from typing import Dict, Any app = Flask(__name__) app.secret_key = get_secret_key('flask') +logger = logging.getLogger(__name__) + # Hook up Flask logging with gunicorn gunicorn_logger = logging.getLogger('gunicorn.error') root_logger = logging.getLogger() @@ -435,6 +439,25 @@ def webfinger(): headers={'Content-Type': 'application/jrd+json; charset=utf-8' if not app.debug else 'application/json'}, ) + +def add_extra_collection(raw_doc: Dict[str, Any]) -> Dict[str, Any]: + if 'col_likes' in raw_doc.get('meta', {}): + col_likes = raw_doc['meta']['col_likes'] + if raw_doc['activity']['type'] == ActivityType.CREATE.value: + raw_doc['activity']['object']['likes'] = embed_collection(col_likes) + if 'col_shares' in raw_doc.get('meta', {}): + col_shares = raw_doc['meta']['col_shares'] + if raw_doc['activity']['type'] == ActivityType.CREATE.value: + raw_doc['activity']['object']['shares'] = embed_collection(col_shares) + + return raw_doc + + +def activity_from_doc(raw_doc: Dict[str, Any]) -> Dict[str, Any]: + raw_doc = add_extra_collection(raw_doc) + return clean_activity(raw_doc['activity']) + + @app.route('/outbox', methods=['GET', 'POST']) def outbox(): if request.method == 'GET': @@ -444,7 +467,7 @@ def outbox(): # FIXME(tsileo): filter deleted, add query support for build_ordered_collection q = { 'meta.deleted': False, - 'type': {'$in': [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]}, + #'type': {'$in': [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]}, } return jsonify(**activitypub.build_ordered_collection( DB.outbox, @@ -477,7 +500,7 @@ def outbox(): @app.route('/outbox/') def outbox_detail(item_id): doc = DB.outbox.find_one({'id': item_id, 'meta.deleted': False}) - return jsonify(**clean_activity(doc['activity'])) + return jsonify(**activity_from_doc(doc)) @app.route('/outbox//activity') @@ -485,10 +508,11 @@ def outbox_activity(item_id): data = DB.outbox.find_one({'id': item_id, 'meta.deleted': False}) if not data: abort(404) - obj = data['activity'] + obj = activity_from_doc(data) if obj['type'] != ActivityType.CREATE.value: abort(404) - return jsonify(**clean_activity(obj['object'])) + return jsonify(**obj['object']) + @app.route('/admin', methods=['GET']) @login_required @@ -597,23 +621,38 @@ def notifications(): cursor=cursor, ) -@app.route('/ui/boost') -@login_required -def ui_boost(): + +@app.route('/api/boost') +@api_required +def api_boost(): oid = request.args.get('id') obj = activitypub.parse_activity(OBJECT_SERVICE.get(oid)) announce = obj.build_announce() announce.post_to_outbox() - return redirect(request.args.get('redirect')) + if request.args.get('redirect'): + return redirect(request.args.get('redirect')) + return Response( + status=201, + headers={'Microblogpub-Created-Activity': announce.id}, + ) -@app.route('/ui/like') -@login_required -def ui_like(): +@app.route('/api/like') +@api_required +def api_like(): + # FIXME(tsileo): ensure a Note and not a Create is given oid = request.args.get('id') obj = activitypub.parse_activity(OBJECT_SERVICE.get(oid)) + if not obj: + raise ValueError(f'unkown {oid} object') like = obj.build_like() like.post_to_outbox() - return redirect(request.args.get('redirect')) + if request.args.get('redirect'): + return redirect(request.args.get('redirect')) + return Response( + status=201, + headers={'Microblogpub-Created-Activity': like.id}, + ) + @app.route('/api/undo', methods=['GET', 'POST']) @api_required @@ -702,19 +741,20 @@ def inbox(): )) data = request.get_json(force=True) - print(data) + logger.debug(f'req_headers={request.headers}') + logger.debug(f'raw_data={data}') try: print(verify_request(ACTOR_SERVICE)) except Exception: - print('failed to verify request, trying to verify the payload by fetching the remote') + logger.exception('failed to verify request, trying to verify the payload by fetching the remote') try: data = OBJECT_SERVICE.get(data['id']) except Exception: - print(f'failed to fetch remote id at {data["id"]}') + logger.exception(f'failed to fetch remote id at {data["id"]}') abort(422) activity = activitypub.parse_activity(data) - print(activity) + logger.debug(f'inbox activity={activity}/{data}') activity.process_from_inbox() return Response( diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index 06f3f05..a834b93 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -21,6 +21,8 @@ services: - mongo - rmq command: 'celery worker -l info -A tasks' + volumes: + - "${CONFIG_DIR}:/app/config" environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 diff --git a/docker-compose.yml b/docker-compose.yml index 74f92d8..c7f07b5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,9 @@ services: - mongo - rmq command: 'celery worker -l info -A tasks' - environment: + volumes: + - "${CONFIG_DIR}:/app/config" + environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 mongo: diff --git a/tasks.py b/tasks.py index 7079491..7e45581 100644 --- a/tasks.py +++ b/tasks.py @@ -15,7 +15,7 @@ from utils.httpsig import HTTPSigAuth from utils.opengraph import fetch_og_metadata -log = logging.getLogger() +log = logging.getLogger(__name__) app = Celery('tasks', broker=os.getenv('MICROBLOGPUB_AMQP_BROKER', 'pyamqp://guest@localhost//')) # app = Celery('tasks', broker='pyamqp://guest@rabbitmq//') SigAuth = HTTPSigAuth(ID+'#main-key', KEY.privkey) @@ -31,7 +31,6 @@ def post_to_inbox(self, payload, to): 'Accept': HEADERS[1], 'User-Agent': USER_AGENT, }) - print(resp) log.info('resp=%s', resp) log.info('resp_body=%s', resp.text) resp.raise_for_status() diff --git a/tests/federation_test.py b/tests/federation_test.py index 971b43c..0b3c2e9 100644 --- a/tests/federation_test.py +++ b/tests/federation_test.py @@ -68,6 +68,20 @@ class Instance(object): time.sleep(self._create_delay) return resp.headers.get('microblogpub-created-activity') + def boost(self, activity_id): + resp = self.session.get(f'{self.host_url}/api/boost', params={'id': activity_id}) + assert resp.status_code == 201 + + time.sleep(self._create_delay) + return resp.headers.get('microblogpub-created-activity') + + def like(self, activity_id): + resp = self.session.get(f'{self.host_url}/api/like', params={'id': activity_id}) + assert resp.status_code == 201 + + time.sleep(self._create_delay) + return resp.headers.get('microblogpub-created-activity') + def undo(self, oid: str) -> None: resp = self.session.get(f'{self.host_url}/api/undo', params={'id': oid}) assert resp.status_code == 201 @@ -97,6 +111,11 @@ class Instance(object): resp.raise_for_status() return resp.json() + def outbox_get(self, aid): + resp = self.session.get(aid.replace(self.docker_url, self.host_url), headers={'Accept': 'application/activity+json'}) + resp.raise_for_status() + return resp.json() + def stream_jsonfeed(self): resp = self.session.get(f'{self.host_url}/api/stream', headers={'Accept': 'application/json'}) resp.raise_for_status() @@ -163,6 +182,7 @@ def test_follow_unfollow(): assert instance2_debug['inbox'] == 2 # An Follow and Undo activity should be there assert instance2_debug['outbox'] == 1 # We've sent a Accept activity + def test_post_content(): instance1, instance2 = _instances() # Instance1 follows instance2 @@ -181,3 +201,125 @@ def test_post_content(): inbox_stream = instance2.stream_jsonfeed() assert len(inbox_stream['items']) == 1 assert inbox_stream['items'][0]['id'] == create_id + + +def test_post_content_and_like(): + instance1, instance2 = _instances() + # Instance1 follows instance2 + instance1.follow(instance2) + instance2.follow(instance1) + + create_id = instance1.new_note('hello') + + # Ensure the post is visible in instance2's stream + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 1 + assert inbox_stream['items'][0]['id'] == create_id + + # Now, instance2 like the note + like_id = instance2.like(f'{create_id}/activity') + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 3 # Follow, Accept and Like + assert instance1_debug['outbox'] == 3 # Folllow, Accept, and Create + + note = instance1.outbox_get(f'{create_id}/activity') + assert 'likes' in note + assert len(note['likes']['items']) == 1 + assert note['likes']['items'][0]['id'] == like_id + +def test_post_content_and_like_unlike(): + instance1, instance2 = _instances() + # Instance1 follows instance2 + instance1.follow(instance2) + instance2.follow(instance1) + + create_id = instance1.new_note('hello') + + # Ensure the post is visible in instance2's stream + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 1 + assert inbox_stream['items'][0]['id'] == create_id + + # Now, instance2 like the note + like_id = instance2.like(f'{create_id}/activity') + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 3 # Follow, Accept and Like + assert instance1_debug['outbox'] == 3 # Folllow, Accept, and Create + + note = instance1.outbox_get(f'{create_id}/activity') + assert 'likes' in note + assert len(note['likes']['items']) == 1 + assert note['likes']['items'][0]['id'] == like_id + + instance2.undo(like_id) + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 4 # Follow, Accept and Like and Undo + assert instance1_debug['outbox'] == 3 # Folllow, Accept, and Create + + note = instance1.outbox_get(f'{create_id}/activity') + assert 'likes' in note + assert len(note['likes']['items']) == 0 + +def test_post_content_and_boost(): + instance1, instance2 = _instances() + # Instance1 follows instance2 + instance1.follow(instance2) + instance2.follow(instance1) + + create_id = instance1.new_note('hello') + + # Ensure the post is visible in instance2's stream + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 1 + assert inbox_stream['items'][0]['id'] == create_id + + # Now, instance2 like the note + boost_id = instance2.boost(f'{create_id}/activity') + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 3 # Follow, Accept and Announce + assert instance1_debug['outbox'] == 3 # Folllow, Accept, and Create + + note = instance1.outbox_get(f'{create_id}/activity') + assert 'shares' in note + assert len(note['shares']['items']) == 1 + assert note['shares']['items'][0]['id'] == boost_id + + +def test_post_content_and_boost_unboost(): + instance1, instance2 = _instances() + # Instance1 follows instance2 + instance1.follow(instance2) + instance2.follow(instance1) + + create_id = instance1.new_note('hello') + + # Ensure the post is visible in instance2's stream + inbox_stream = instance2.stream_jsonfeed() + assert len(inbox_stream['items']) == 1 + assert inbox_stream['items'][0]['id'] == create_id + + # Now, instance2 like the note + boost_id = instance2.boost(f'{create_id}/activity') + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 3 # Follow, Accept and Announce + assert instance1_debug['outbox'] == 3 # Folllow, Accept, and Create + + note = instance1.outbox_get(f'{create_id}/activity') + assert 'shares' in note + assert len(note['shares']['items']) == 1 + assert note['shares']['items'][0]['id'] == boost_id + + instance2.undo(boost_id) + + instance1_debug = instance1.debug() + assert instance1_debug['inbox'] == 4 # Follow, Accept and Announce and Undo + assert instance1_debug['outbox'] == 3 # Folllow, Accept, and Create + + note = instance1.outbox_get(f'{create_id}/activity') + assert 'shares' in note + assert len(note['shares']['items']) == 0 diff --git a/utils/httpsig.py b/utils/httpsig.py index 84245fb..f63c12c 100644 --- a/utils/httpsig.py +++ b/utils/httpsig.py @@ -8,6 +8,7 @@ from urllib.parse import urlparse from typing import Any, Dict import base64 import hashlib +import logging from flask import request from requests.auth import AuthBase @@ -15,6 +16,8 @@ from requests.auth import AuthBase from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA256 +logger = logging.getLogger(__name__) + def _build_signed_string(signed_headers: str, method: str, path: str, headers: Any, body_digest: str) -> str: out = [] @@ -51,6 +54,7 @@ def _body_digest() -> str: def verify_request(actor_service) -> bool: hsig = _parse_sig_header(request.headers.get('Signature')) + logger.debug(f'hsig={hsig}') signed_string = _build_signed_string(hsig['headers'], request.method, request.path, request.headers, _body_digest()) _, rk = actor_service.get_public_key(hsig['keyId']) return _verify_h(signed_string, base64.b64decode(hsig['signature']), rk) @@ -62,6 +66,7 @@ class HTTPSigAuth(AuthBase): self.privkey = privkey def __call__(self, r): + logger.info(f'keyid={self.keyid}') host = urlparse(r.url).netloc bh = hashlib.new('sha256') bh.update(r.body.encode('utf-8')) @@ -79,5 +84,6 @@ class HTTPSigAuth(AuthBase): headers = { 'Signature': f'keyId="{self.keyid}",algorithm="rsa-sha256",headers="{sigheaders}",signature="{sig}"' } + logger.info(f'signed request headers={headers}') r.headers.update(headers) return r