diff --git a/activitypub.py b/activitypub.py index cdb0bd1..697ec59 100644 --- a/activitypub.py +++ b/activitypub.py @@ -1,77 +1,29 @@ import logging -import json -import binascii -import os + from datetime import datetime -from enum import Enum from bson.objectid import ObjectId from html2text import html2text from feedgen.feed import FeedGenerator -from utils.actor_service import NotAnActorError -from utils.errors import BadActivityError -from utils.errors import UnexpectedActivityTypeError -from utils.errors import NotFromOutboxError -from utils import activitypub_utils +from little_boxes import activitypub as ap +from little_boxes.backend import Backend +from little_boxes.collection import parse_collection as ap_parse_collection + from config import USERNAME, BASE_URL, ID -from config import CTX_AS, CTX_SECURITY, AS_PUBLIC -from config import DB, ME, ACTOR_SERVICE -from config import OBJECT_SERVICE -from config import PUBLIC_INSTANCES +from config import DB, ME import tasks from typing import List, Optional, Dict, Any, Union logger = logging.getLogger(__name__) -# Helper/shortcut for typing -ObjectType = Dict[str, Any] -ObjectOrIDType = Union[str, ObjectType] - -COLLECTION_CTX = [ - "https://www.w3.org/ns/activitystreams", - "https://w3id.org/security/v1", - { - "Hashtag": "as:Hashtag", - "sensitive": "as:sensitive", - } -] - - -class ActivityType(Enum): - """Supported activity `type`.""" - ANNOUNCE = 'Announce' - BLOCK = 'Block' - LIKE = 'Like' - CREATE = 'Create' - UPDATE = 'Update' - PERSON = 'Person' - ORDERED_COLLECTION = 'OrderedCollection' - ORDERED_COLLECTION_PAGE = 'OrderedCollectionPage' - COLLECTION_PAGE = 'CollectionPage' - COLLECTION = 'Collection' - NOTE = 'Note' - ACCEPT = 'Accept' - REJECT = 'Reject' - FOLLOW = 'Follow' - DELETE = 'Delete' - UNDO = 'Undo' - IMAGE = 'Image' - TOMBSTONE = 'Tombstone' - - -def random_object_id() -> str: - """Generates a random object ID.""" - return binascii.hexlify(os.urandom(8)).decode('utf-8') - - -def _remove_id(doc: ObjectType) -> ObjectType: +def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: """Helper for removing MongoDB's `_id` field.""" doc = doc.copy() - if '_id' in doc: - del(doc['_id']) + if "_id" in doc: + del (doc["_id"]) return doc @@ -82,1125 +34,337 @@ def _to_list(data: Union[List[Any], Any]) -> List[Any]: return [data] -def clean_activity(activity: ObjectType) -> Dict[str, Any]: - """Clean the activity before rendering it. - - Remove the hidden bco and bcc field - """ - for field in ['bto', 'bcc']: - if field in activity: - del(activity[field]) - if activity['type'] == 'Create' and field in activity['object']: - del(activity['object'][field]) - return activity - - -def _get_actor_id(actor: ObjectOrIDType) -> str: - """Helper for retrieving an actor `id`.""" - if isinstance(actor, dict): - return actor['id'] - return actor - - -class BaseActivity(object): - """Base class for ActivityPub activities.""" - - ACTIVITY_TYPE: Optional[ActivityType] = None - ALLOWED_OBJECT_TYPES: List[ActivityType] = [] - OBJECT_REQUIRED = False - - def __init__(self, **kwargs) -> None: - # Ensure the class has an activity type defined - if not self.ACTIVITY_TYPE: - raise BadActivityError('Missing ACTIVITY_TYPE') - - # XXX(tsileo): what to do about this check? - # Ensure the activity has a type and a valid one - # if kwargs.get('type') is None: - # raise BadActivityError('missing activity type') - - if kwargs.get('type') and kwargs.pop('type') != self.ACTIVITY_TYPE.value: - raise UnexpectedActivityTypeError(f'Expect the type to be {self.ACTIVITY_TYPE.value!r}') - - # Initialize the object - self._data: Dict[str, Any] = { - 'type': self.ACTIVITY_TYPE.value - } - logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity: {kwargs}') - - if 'id' in kwargs: - self._data['id'] = kwargs.pop('id') - - if self.ACTIVITY_TYPE != ActivityType.PERSON: - actor = kwargs.get('actor') - if actor: - kwargs.pop('actor') - actor = self._validate_person(actor) - self._data['actor'] = actor - else: - # FIXME(tsileo): uses a special method to set the actor as "the instance" - if not self.NO_CONTEXT and self.ACTIVITY_TYPE != ActivityType.TOMBSTONE: - actor = ID - self._data['actor'] = actor - - if 'object' in kwargs: - obj = kwargs.pop('object') - if isinstance(obj, str): - self._data['object'] = obj - else: - if not self.ALLOWED_OBJECT_TYPES: - raise UnexpectedActivityTypeError('unexpected object') - if 'type' not in obj or (self.ACTIVITY_TYPE != ActivityType.CREATE and 'id' not in obj): - raise BadActivityError('invalid object, missing type') - if ActivityType(obj['type']) not in self.ALLOWED_OBJECT_TYPES: - raise UnexpectedActivityTypeError( - f'unexpected object type {obj["type"]} (allowed={self.ALLOWED_OBJECT_TYPES})' - ) - self._data['object'] = obj - - if '@context' not in kwargs: - if not self.NO_CONTEXT: - self._data['@context'] = CTX_AS - else: - self._data['@context'] = kwargs.pop('@context') - - # @context check - if not self.NO_CONTEXT: - if not isinstance(self._data['@context'], list): - self._data['@context'] = [self._data['@context']] - if CTX_SECURITY not in self._data['@context']: - self._data['@context'].append(CTX_SECURITY) - if isinstance(self._data['@context'][-1], dict): - self._data['@context'][-1]['Hashtag'] = 'as:Hashtag' - self._data['@context'][-1]['sensitive'] = 'as:sensitive' - else: - self._data['@context'].append({'Hashtag': 'as:Hashtag', 'sensitive': 'as:sensitive'}) - - allowed_keys = None - try: - allowed_keys = self._init(**kwargs) - logger.debug('calling custom init') - except NotImplementedError: - pass - - if allowed_keys: - # Allows an extra to (like for Accept and Follow) - kwargs.pop('to', None) - if len(set(kwargs.keys()) - set(allowed_keys)) > 0: - raise BadActivityError(f'extra data left: {kwargs!r}') - else: - # Remove keys with `None` value - valid_kwargs = {} - for k, v in kwargs.items(): - if v is None: - continue - valid_kwargs[k] = v - self._data.update(**valid_kwargs) - - def _init(self, **kwargs) -> Optional[List[str]]: - raise NotImplementedError - - def _verify(self) -> None: - raise NotImplementedError - - def verify(self) -> None: - """Verifies that the activity is valid.""" - if self.OBJECT_REQUIRED and 'object' not in self._data: - raise BadActivityError('activity must have an "object"') - - try: - self._verify() - except NotImplementedError: - pass - - def __repr__(self) -> str: - return '{}({!r})'.format(self.__class__.__qualname__, self._data.get('id')) - - def __str__(self) -> str: - return str(self._data.get('id', f'[new {self.ACTIVITY_TYPE} activity]')) - - def __getattr__(self, name: str) -> Any: - if self._data.get(name): - return self._data.get(name) - - @property - def type_enum(self) -> ActivityType: - return ActivityType(self.type) - - def _set_id(self, uri: str, obj_id: str) -> None: - raise NotImplementedError - - def set_id(self, uri: str, obj_id: str) -> None: - logger.debug(f'setting ID {uri} / {obj_id}') - self._data['id'] = uri - try: - self._set_id(uri, obj_id) - except NotImplementedError: - pass - - def _actor_id(self, obj: ObjectOrIDType) -> str: - if isinstance(obj, dict) and obj['type'] == ActivityType.PERSON.value: - obj_id = obj.get('id') - if not obj_id: - raise ValueError('missing object id') - return obj_id - else: - return str(obj) - - def _validate_person(self, obj: ObjectOrIDType) -> str: - obj_id = self._actor_id(obj) - try: - actor = ACTOR_SERVICE.get(obj_id) - except Exception: - return obj_id # FIXME(tsileo): handle this - if not actor: - raise ValueError('Invalid actor') - return actor['id'] - - def get_object(self) -> 'BaseActivity': - if self.__obj: - return self.__obj - if isinstance(self._data['object'], dict): - p = parse_activity(self._data['object']) - else: - if self.ACTIVITY_TYPE == ActivityType.FOLLOW: - p = Person(**ACTOR_SERVICE.get(self._data['object'])) - else: - obj = OBJECT_SERVICE.get(self._data['object']) - if ActivityType(obj.get('type')) not in self.ALLOWED_OBJECT_TYPES: - raise UnexpectedActivityTypeError(f'invalid object type {obj.get("type")}') - - p = parse_activity(obj) - - self.__obj: Optional[BaseActivity] = p - return p - - def reset_object_cache(self) -> None: - self.__obj = None - - def to_dict(self, embed: bool = False, embed_object_id_only: bool = False) -> ObjectType: - data = dict(self._data) - if embed: - for k in ['@context', 'signature']: - if k in data: - del(data[k]) - if data.get('object') and embed_object_id_only and isinstance(data['object'], dict): - try: - data['object'] = data['object']['id'] - except KeyError: - raise BadActivityError('embedded object does not have an id') - - return data - - def get_actor(self) -> 'BaseActivity': - actor = self._data.get('actor') - if not actor: - if self.type_enum == ActivityType.NOTE: - actor = str(self._data.get('attributedTo')) - else: - raise ValueError('failed to fetch actor') - - actor_id = self._actor_id(actor) - return Person(**ACTOR_SERVICE.get(actor_id)) - - def _pre_post_to_outbox(self) -> None: - raise NotImplementedError - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - raise NotImplementedError - - def _undo_outbox(self) -> None: - raise NotImplementedError - - def _pre_process_from_inbox(self) -> None: - raise NotImplementedError - - def _process_from_inbox(self) -> None: - raise NotImplementedError - - def _undo_inbox(self) -> None: - raise NotImplementedError - - def _undo_should_purge_cache(self) -> bool: - raise NotImplementedError - - def _should_purge_cache(self) -> bool: - raise NotImplementedError - - def process_from_inbox(self) -> None: - logger.debug(f'calling main process from inbox hook for {self}') - self.verify() - actor = self.get_actor() - - # Check for Block activity - if DB.outbox.find_one({'type': ActivityType.BLOCK.value, - 'activity.object': actor.id, - 'meta.undo': False}): - logger.info(f'actor {actor} is blocked, dropping the received activity {self}') - return - - if DB.inbox.find_one({'remote_id': self.id}): - # The activity is already in the inbox - logger.info(f'received duplicate activity {self}, dropping it') - return - - try: - self._pre_process_from_inbox() - logger.debug('called pre process from inbox hook') - except NotImplementedError: - logger.debug('pre process from inbox hook not implemented') - - activity = self.to_dict() - DB.inbox.insert_one({ - 'activity': activity, - 'type': self.type, - 'remote_id': self.id, - 'meta': {'undo': False, 'deleted': False}, - }) - logger.info('activity {self} saved') - - try: - self._process_from_inbox() - logger.debug('called process from inbox hook') - except NotImplementedError: - logger.debug('process from inbox hook not implemented') - - def post_to_outbox(self) -> None: - logger.debug(f'calling main post to outbox hook for {self}') - obj_id = random_object_id() - self.set_id(f'{ID}/outbox/{obj_id}', obj_id) - self.verify() - - try: - self._pre_post_to_outbox() - logger.debug(f'called pre post to outbox hook') - except NotImplementedError: - logger.debug('pre post to outbox hook not implemented') - - activity = self.to_dict() - DB.outbox.insert_one({ - 'id': obj_id, - 'activity': activity, - 'type': self.type, - 'remote_id': self.id, - 'meta': {'undo': False, 'deleted': False}, - }) - - recipients = self.recipients() - logger.info(f'recipients={recipients}') - activity = clean_activity(activity) - - try: - self._post_to_outbox(obj_id, activity, recipients) - logger.debug(f'called post to outbox hook') - except NotImplementedError: - logger.debug('post to outbox hook not implemented') - - payload = json.dumps(activity) - for recp in recipients: - logger.debug(f'posting to {recp}') - self._post_to_inbox(payload, recp) - - def _post_to_inbox(self, payload: str, to: str): - tasks.post_to_inbox.delay(payload, to) - - def _recipients(self) -> List[str]: - return [] - - def recipients(self) -> List[str]: - recipients = self._recipients() - - out: List[str] = [] - for recipient in recipients: - if recipient in PUBLIC_INSTANCES: - if recipient not in out: - out.append(str(recipient)) - continue - if recipient in [ME, AS_PUBLIC, None]: - continue - if isinstance(recipient, Person): - if recipient.id == ME: - continue - actor = recipient - else: - try: - actor = Person(**ACTOR_SERVICE.get(recipient)) - - if actor.endpoints: - shared_inbox = actor.endpoints.get('sharedInbox') - if shared_inbox not in out: - out.append(shared_inbox) - continue - - if actor.inbox and actor.inbox not in out: - out.append(actor.inbox) - - except NotAnActorError as error: - # Is the activity a `Collection`/`OrderedCollection`? - if error.activity and error.activity['type'] in [ActivityType.COLLECTION.value, - ActivityType.ORDERED_COLLECTION.value]: - for item in parse_collection(error.activity): - if item in [ME, AS_PUBLIC]: - continue - try: - col_actor = Person(**ACTOR_SERVICE.get(item)) - except NotAnActorError: - pass - - if col_actor.endpoints: - shared_inbox = col_actor.endpoints.get('sharedInbox') - if shared_inbox not in out: - out.append(shared_inbox) - continue - if col_actor.inbox and col_actor.inbox not in out: - out.append(col_actor.inbox) - - return out - - def build_undo(self) -> 'BaseActivity': - raise NotImplementedError - - def build_delete(self) -> 'BaseActivity': - raise NotImplementedError - - -class Person(BaseActivity): - ACTIVITY_TYPE = ActivityType.PERSON - - def _init(self, **kwargs): - # if 'icon' in kwargs: - # self._data['icon'] = Image(**kwargs.pop('icon')) - pass - - def _verify(self) -> None: - ACTOR_SERVICE.get(self._data['id']) - - -class Block(BaseActivity): - ACTIVITY_TYPE = ActivityType.BLOCK - OBJECT_REQUIRED = True - - -class Collection(BaseActivity): - ACTIVITY_TYPE = ActivityType.COLLECTION - - -class Image(BaseActivity): - ACTIVITY_TYPE = ActivityType.IMAGE - NO_CONTEXT = True - - def _init(self, **kwargs): - self._data.update( - url=kwargs.pop('url'), +class MicroblogPubBackend(Backend): + def base_url(self) -> str: + return BASE_URL + + def activity_url(self, obj_id): + return f"{BASE_URL}/outbox/{obj_id}" + + def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: + DB.outbox.insert_one( + { + "activity": activity.to_dict(), + "type": activity.type, + "remote_id": activity.id, + "meta": {"undo": False, "deleted": False}, + } ) - def __repr__(self): - return 'Image({!r})'.format(self._data.get('url')) - - -class Follow(BaseActivity): - ACTIVITY_TYPE = ActivityType.FOLLOW - ALLOWED_OBJECT_TYPES = [ActivityType.PERSON] - OBJECT_REQUIRED = True - - def _build_reply(self, reply_type: ActivityType) -> BaseActivity: - if reply_type == ActivityType.ACCEPT: - return Accept( - object=self.to_dict(embed=True), + def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool: + return bool( + DB.outbox.find_one( + { + "type": ap.ActivityType.BLOCK.value, + "activity.object": as_actor.id, + "meta.undo": False, + } ) + ) - raise ValueError(f'type {reply_type} is invalid for building a reply') + def fetch_iri(self, iri: str) -> ap.ObjectType: + pass - def _recipients(self) -> List[str]: - return [self.get_object().id] + def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool: + return bool(DB.inbox.find_one({"remote_id": iri})) - def _process_from_inbox(self) -> None: - accept = self.build_accept() - accept.post_to_outbox() + def inbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: + DB.inbox.insert_one( + { + "activity": activity.to_dict(), + "type": activity.type, + "remote_id": activity.id, + "meta": {"undo": False, "deleted": False}, + } + ) - remote_actor = self.get_actor().id + def post_to_remote_inbox(self, as_actor: ap.Person, payload: str, to: str) -> None: + tasks.post_to_inbox.delay(payload, to) - if DB.followers.find({'remote_actor': remote_actor}).count() == 0: - DB.followers.insert_one({'remote_actor': remote_actor}) + def new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: + remote_actor = follow.get_actor().id - def _undo_inbox(self) -> None: - DB.followers.delete_one({'remote_actor': self.get_actor().id}) + if DB.followers.find({"remote_actor": remote_actor}).count() == 0: + DB.followers.insert_one({"remote_actor": remote_actor}) - def _undo_outbox(self) -> None: - DB.following.delete_one({'remote_actor': self.get_object().id}) + def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: + # TODO(tsileo): update the follow to set undo + DB.followers.delete_one({"remote_actor": follow.get_actor().id}) - def build_accept(self) -> BaseActivity: - return self._build_reply(ActivityType.ACCEPT) + def undo_new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: + # TODO(tsileo): update the follow to set undo + DB.following.delete_one({"remote_actor": follow.get_object().id}) - def build_undo(self) -> BaseActivity: - return Undo(object=self.to_dict(embed=True)) + def new_following(self, as_actor: ap.Person, follow: ap.Follow) -> None: + remote_actor = follow.get_actor().id + if DB.following.find({"remote_actor": remote_actor}).count() == 0: + DB.following.insert_one({"remote_actor": remote_actor}) - def _should_purge_cache(self) -> bool: - # Receiving a follow activity in the inbox should reset the application cache - return True - - -class Accept(BaseActivity): - ACTIVITY_TYPE = ActivityType.ACCEPT - ALLOWED_OBJECT_TYPES = [ActivityType.FOLLOW] - - def _recipients(self) -> List[str]: - return [self.get_object().get_actor().id] - - def _process_from_inbox(self) -> None: - remote_actor = self.get_actor().id - if DB.following.find({'remote_actor': remote_actor}).count() == 0: - DB.following.insert_one({'remote_actor': remote_actor}) - - def _should_purge_cache(self) -> bool: - # Receiving an accept activity in the inbox should reset the application cache - # (a follow request has been accepted) - return True - - -class Undo(BaseActivity): - ACTIVITY_TYPE = ActivityType.UNDO - ALLOWED_OBJECT_TYPES = [ActivityType.FOLLOW, ActivityType.LIKE, ActivityType.ANNOUNCE] - OBJECT_REQUIRED = True - - def _recipients(self) -> List[str]: - obj = self.get_object() - if obj.type_enum == ActivityType.FOLLOW: - return [obj.get_object().id] - else: - return [obj.get_object().get_actor().id] - # TODO(tsileo): handle like and announce - raise Exception('TODO') - - def _pre_process_from_inbox(self) -> None: - """Ensures an Undo activity comes from the same actor as the updated activity.""" - obj = self.get_object() - actor = self.get_actor() - if actor.id != obj.get_actor().id: - raise BadActivityError(f'{actor!r} cannot update {obj!r}') - - def _process_from_inbox(self) -> None: - obj = self.get_object() - DB.inbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}}) - - try: - obj._undo_inbox() - except NotImplementedError: - pass - - def _should_purge_cache(self) -> bool: - obj = self.get_object() - try: - # Receiving a undo activity regarding an activity that was mentioning a published activity - # should purge the cache - return obj._undo_should_purge_cache() - except NotImplementedError: - pass - - return False - - def _pre_post_to_outbox(self) -> None: - """Ensures an Undo activity references an activity owned by the instance.""" - obj = self.get_object() - if not obj.id.startswith(ID): - raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - logger.debug('processing undo to outbox') - logger.debug('self={}'.format(self)) - obj = self.get_object() - logger.debug('obj={}'.format(obj)) - DB.outbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}}) - - try: - obj._undo_outbox() - logger.debug(f'_undo_outbox called for {obj}') - except NotImplementedError: - logger.debug(f'_undo_outbox not implemented for {obj}') - pass - - -class Like(BaseActivity): - ACTIVITY_TYPE = ActivityType.LIKE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] - OBJECT_REQUIRED = True - - def _recipients(self) -> List[str]: - return [self.get_object().get_actor().id] - - def _process_from_inbox(self): - obj = self.get_object() + def inbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: + obj = like.get_object() # Update the meta counter if the object is published by the server - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$inc': {'meta.count_like': 1}, - }) - # XXX(tsileo): notification?? + DB.outbox.update_one( + {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": 1}} + ) - def _undo_inbox(self) -> None: - obj = self.get_object() + def inbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: + obj = like.get_object() # Update the meta counter if the object is published by the server - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$inc': {'meta.count_like': -1}, - }) + DB.outbox.update_one( + {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": -1}} + ) - def _undo_should_purge_cache(self) -> bool: - # If a like coutn was decremented, we need to purge the application cache - return self.get_object().id.startswith(BASE_URL) - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]): - obj = self.get_object() + def outobx_like(self, as_actor: ap.Person, like: ap.Like) -> None: + obj = like.get_object() # Unlikely, but an actor can like it's own post - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$inc': {'meta.count_like': 1}, - }) + DB.outbox.update_one( + {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": 1}} + ) # Keep track of the like we just performed - DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': obj_id}}) + DB.inbox.update_one( + {"activity.object.id": obj.id}, {"$set": {"meta.liked": like.id}} + ) - def _undo_outbox(self) -> None: - obj = self.get_object() + def outbox_undo_like(self, as_actor: ap.Person, like: ap.Like) -> None: + obj = like.get_object() # Unlikely, but an actor can like it's own post - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$inc': {'meta.count_like': -1}, - }) + DB.outbox.update_one( + {"activity.object.id": obj.id}, {"$inc": {"meta.count_like": -1}} + ) - DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': False}}) + DB.inbox.update_one( + {"activity.object.id": obj.id}, {"$set": {"meta.liked": False}} + ) - def build_undo(self) -> BaseActivity: - return Undo(object=self.to_dict(embed=True, embed_object_id_only=True)) - - -class Announce(BaseActivity): - ACTIVITY_TYPE = ActivityType.ANNOUNCE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] - - def _recipients(self) -> List[str]: - recipients = [] - - for field in ['to', 'cc']: - if field in self._data: - recipients.extend(_to_list(self._data[field])) - - return recipients - - def _process_from_inbox(self) -> None: - if isinstance(self._data['object'], str) and not self._data['object'].startswith('http'): + def inbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: + if isinstance(announce._data["object"], str) and not announce._data[ + "object" + ].startswith("http"): # TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else logger.warn( - f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message' + f'received an Annouce referencing an OStatus notice ({announce._data["object"]}), dropping the message' ) return - # Save/cache the object, and make it part of the stream so we can fetch it - if isinstance(self._data['object'], str): - raw_obj = OBJECT_SERVICE.get( - self._data['object'], - reload_cache=True, - part_of_stream=True, - announce_published=self._data['published'], - ) - obj = parse_activity(raw_obj) + # FIXME(tsileo): Save/cache the object, and make it part of the stream so we can fetch it + if isinstance(announce._data["object"], str): + obj_iri = announce._data["object"] else: - obj = self.get_object() + obj_iri = self.get_object().id - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$inc': {'meta.count_boost': 1}, - }) + DB.outbox.update_one( + {"activity.object.id": obj_iri}, {"$inc": {"meta.count_boost": 1}} + ) - def _undo_inbox(self) -> None: - obj = self.get_object() + def inbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: + obj = announce.get_object() # Update the meta counter if the object is published by the server - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$inc': {'meta.count_boost': -1}, - }) + DB.outbox.update_one( + {"activity.object.id": obj.id}, {"$inc": {"meta.count_boost": -1}} + ) - def _undo_should_purge_cache(self) -> bool: - # If a like coutn was decremented, we need to purge the application cache - return self.get_object().id.startswith(BASE_URL) + def outbox_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: + obj = announce.get_object() + DB.inbox.update_one( + {"activity.object.id": obj.id}, {"$set": {"meta.boosted": announce.id}} + ) - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - if isinstance(self._data['object'], str): - # Put the object in the cache - OBJECT_SERVICE.get( - self._data['object'], - reload_cache=True, - part_of_stream=True, - announce_published=self._data['published'], - ) + def outbox_undo_announce(self, as_actor: ap.Person, announce: ap.Announce) -> None: + obj = announce.get_object() + DB.inbox.update_one( + {"activity.object.id": obj.id}, {"$set": {"meta.boosted": False}} + ) - obj = self.get_object() - DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': obj_id}}) - - def _undo_outbox(self) -> None: - obj = self.get_object() - DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': False}}) - - def build_undo(self) -> BaseActivity: - return Undo(object=self.to_dict(embed=True)) - - -class Delete(BaseActivity): - ACTIVITY_TYPE = ActivityType.DELETE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.TOMBSTONE] - OBJECT_REQUIRED = True - - def _get_actual_object(self) -> BaseActivity: - obj = self.get_object() - if obj.type_enum == ActivityType.TOMBSTONE: - obj = parse_activity(OBJECT_SERVICE.get(obj.id)) - return obj - - def _recipients(self) -> List[str]: - obj = self._get_actual_object() - return obj._recipients() - - def _pre_process_from_inbox(self) -> None: - """Ensures a Delete activity comes from the same actor as the deleted activity.""" - obj = self._get_actual_object() - actor = self.get_actor() - if actor.id != obj.get_actor().id: - raise BadActivityError(f'{actor!r} cannot delete {obj!r}') - - def _process_from_inbox(self) -> None: - DB.inbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}}) - obj = self._get_actual_object() - if obj.type_enum == ActivityType.NOTE: - obj._delete_from_threads() + def inbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: + DB.inbox.update_one( + {"activity.object.id": delete.get_object().id}, + {"$set": {"meta.deleted": True}}, + ) + # FIXME(tsileo): handle threads + # obj = delete._get_actual_object() + # if obj.type_enum == ActivityType.NOTE: + # obj._delete_from_threads() # TODO(tsileo): also purge the cache if it's a reply of a published activity - def _pre_post_to_outbox(self) -> None: - """Ensures the Delete activity references a activity from the outbox (i.e. owned by the instance).""" - obj = self._get_actual_object() - if not obj.id.startswith(ID): - raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') + def outbox_delete(self, as_actor: ap.Person, delete: ap.Delete) -> None: + DB.outbox.update_one( + {"activity.object.id": delete.get_object().id}, + {"$set": {"meta.deleted": True}}, + ) - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - DB.outbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}}) - - -class Update(BaseActivity): - ACTIVITY_TYPE = ActivityType.UPDATE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.PERSON] - OBJECT_REQUIRED = True - - def _pre_process_from_inbox(self) -> None: - """Ensures an Update activity comes from the same actor as the updated activity.""" - obj = self.get_object() - actor = self.get_actor() - if actor.id != obj.get_actor().id: - raise BadActivityError(f'{actor!r} cannot update {obj!r}') - - def _process_from_inbox(self): - obj = self.get_object() - if obj.type_enum == ActivityType.NOTE: - DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'activity.object': obj.to_dict()}}) + def inbox_update(self, as_actor: ap.Person, update: ap.Update) -> None: + obj = update.get_object() + if obj.ACTIVITY_TYPE == ap.ActivityType.NOTE: + DB.inbox.update_one( + {"activity.object.id": obj.id}, + {"$set": {"activity.object": obj.to_dict()}}, + ) return - # If the object is a Person, it means the profile was updated, we just refresh our local cache - ACTOR_SERVICE.get(obj.id, reload_cache=True) + # FIXME(tsileo): handle update actor amd inbox_update_note/inbox_update_actor - # TODO(tsileo): implements _should_purge_cache if it's a reply of a published activity (i.e. in the outbox) + def outbox_update(self, as_actor: ap.Person, update: ap.Update) -> None: + obj = update._data["object"] - def _pre_post_to_outbox(self) -> None: - obj = self.get_object() - if not obj.id.startswith(ID): - raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - obj = self._data['object'] - - update_prefix = 'activity.object.' - update: Dict[str, Any] = {'$set': dict(), '$unset': dict()} - update['$set'][f'{update_prefix}updated'] = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z' + update_prefix = "activity.object." + update: Dict[str, Any] = {"$set": dict(), "$unset": dict()} + update["$set"][f"{update_prefix}updated"] = ( + datetime.utcnow().replace(microsecond=0).isoformat() + "Z" + ) for k, v in obj.items(): - if k in ['id', 'type']: + if k in ["id", "type"]: continue if v is None: - update['$unset'][f'{update_prefix}{k}'] = '' + update["$unset"][f"{update_prefix}{k}"] = "" else: - update['$set'][f'{update_prefix}{k}'] = v + update["$set"][f"{update_prefix}{k}"] = v - if len(update['$unset']) == 0: - del(update['$unset']) + if len(update["$unset"]) == 0: + del (update["$unset"]) - print(f'updating note from outbox {obj!r} {update}') - logger.info(f'updating note from outbox {obj!r} {update}') - DB.outbox.update_one({'activity.object.id': obj['id']}, update) + print(f"updating note from outbox {obj!r} {update}") + logger.info(f"updating note from outbox {obj!r} {update}") + DB.outbox.update_one({"activity.object.id": obj["id"]}, update) # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients # (create a new Update with the result of the update, and send it without saving it?) -class Create(BaseActivity): - ACTIVITY_TYPE = ActivityType.CREATE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] - OBJECT_REQUIRED = True - - def _set_id(self, uri: str, obj_id: str) -> None: - self._data['object']['id'] = uri + '/activity' - self._data['object']['url'] = ID + '/' + self.get_object().type.lower() + '/' + obj_id - self.reset_object_cache() - - def _init(self, **kwargs): - obj = self.get_object() - if not obj.attributedTo: - self._data['object']['attributedTo'] = self.get_actor().id - if not obj.published: - if self.published: - self._data['object']['published'] = self.published - else: - now = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z' - self._data['published'] = now - self._data['object']['published'] = now - - def _recipients(self) -> List[str]: - # TODO(tsileo): audience support? - recipients = [] - for field in ['to', 'cc', 'bto', 'bcc']: - if field in self._data: - recipients.extend(_to_list(self._data[field])) - - recipients.extend(self.get_object()._recipients()) - - return recipients - - def _update_threads(self) -> None: - logger.debug('_update_threads hook') - obj = self.get_object() - - # TODO(tsileo): re-enable me - # tasks.fetch_og.delay('INBOX', self.id) - - threads = [] - reply = obj.get_local_reply() - print(f'initial_reply={reply}') - print(f'{obj}') - logger.debug(f'initial_reply={reply}') - reply_id = None - direct_reply = 1 - while reply is not None: - if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, { - '$inc': { - 'meta.count_reply': 1, - 'meta.count_direct_reply': direct_reply, - }, - '$addToSet': {'meta.thread_children': obj.id}, - }): - DB.outbox.update_one({'activity.object.id': reply.id}, { - '$inc': { - 'meta.count_reply': 1, - 'meta.count_direct_reply': direct_reply, - }, - '$addToSet': {'meta.thread_children': obj.id}, - }) - - direct_reply = 0 - reply_id = reply.id - reply = reply.get_local_reply() - logger.debug(f'next_reply={reply}') - threads.append(reply_id) - # FIXME(tsileo): obj.id is None!! - print(f'reply_id={reply_id} {obj.id} {obj._data} {self.id}') - - if reply_id: - if not DB.inbox.find_one_and_update({'activity.object.id': obj.id}, { - '$set': { - 'meta.thread_parents': threads, - 'meta.thread_root_parent': reply_id, - }, - }): - DB.outbox.update_one({'activity.object.id': obj.id}, { - '$set': { - 'meta.thread_parents': threads, - 'meta.thread_root_parent': reply_id, - }, - }) - logger.debug('_update_threads done') - - def _process_from_inbox(self) -> None: - self._update_threads() - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - self._update_threads() - - def _should_purge_cache(self) -> bool: - # TODO(tsileo): handle reply of a reply... - obj = self.get_object() - in_reply_to = obj.inReplyTo - if in_reply_to: - local_activity = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) - if local_activity: - return True - - return False - - -class Tombstone(BaseActivity): - ACTIVITY_TYPE = ActivityType.TOMBSTONE - - -class Note(BaseActivity): - ACTIVITY_TYPE = ActivityType.NOTE - - def _init(self, **kwargs): - print(self._data) - # Remove the `actor` field as `attributedTo` is used for `Note` instead - if 'actor' in self._data: - del(self._data['actor']) - if 'sensitive' not in kwargs: - self._data['sensitive'] = False - - def _recipients(self) -> List[str]: - # TODO(tsileo): audience support? - recipients: List[str] = [] - - # If the note is public, we publish it to the defined "public instances" - if AS_PUBLIC in self._data.get('to', []): - recipients.extend(PUBLIC_INSTANCES) - print('publishing to public instances') - print(recipients) - - for field in ['to', 'cc', 'bto', 'bcc']: - if field in self._data: - recipients.extend(_to_list(self._data[field])) - - return recipients - - def _delete_from_threads(self) -> None: - logger.debug('_delete_from_threads hook') - - reply = self.get_local_reply() - logger.debug(f'initial_reply={reply}') - direct_reply = -1 - while reply is not None: - if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, { - '$inc': { - 'meta.count_reply': -1, - 'meta.count_direct_reply': direct_reply, - }, - '$pull': {'meta.thread_children': self.id}, - - }): - DB.outbox.update_one({'activity.object.id': reply.id}, { - '$inc': { - 'meta.count_reply': 1, - 'meta.count_direct_reply': direct_reply, - }, - '$pull': {'meta.thread_children': self.id}, - }) - - direct_reply = 0 - reply = reply.get_local_reply() - logger.debug(f'next_reply={reply}') - - logger.debug('_delete_from_threads done') - return None - - def get_local_reply(self) -> Optional[BaseActivity]: - "Find the note reply if any.""" - in_reply_to = self.inReplyTo - if not in_reply_to: - # This is the root comment - return None - - inbox_parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) - if inbox_parent: - return parse_activity(inbox_parent['activity']['object']) - - outbox_parent = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to}) - if outbox_parent: - return parse_activity(outbox_parent['activity']['object']) - - # The parent is no stored on this instance - return None - - def build_create(self) -> BaseActivity: - """Wraps an activity in a Create activity.""" - create_payload = { - 'object': self.to_dict(embed=True), - 'actor': self.attributedTo or ME, - } - for field in ['published', 'to', 'bto', 'cc', 'bcc', 'audience']: - if field in self._data: - create_payload[field] = self._data[field] - - return Create(**create_payload) - - def build_like(self) -> BaseActivity: - return Like(object=self.id) - - def build_announce(self) -> BaseActivity: - return Announce( - object=self.id, - to=[AS_PUBLIC], - cc=[ID+'/followers', self.attributedTo], - published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z', - ) - - def build_delete(self) -> BaseActivity: - return Delete(object=Tombstone(id=self.id).to_dict(embed=True)) - - def get_tombstone(self, deleted: Optional[str]) -> BaseActivity: - return Tombstone( - id=self.id, - published=self.published, - deleted=deleted, - updated=deleted, - ) - - -_ACTIVITY_TYPE_TO_CLS = { - ActivityType.IMAGE: Image, - ActivityType.PERSON: Person, - ActivityType.FOLLOW: Follow, - ActivityType.ACCEPT: Accept, - ActivityType.UNDO: Undo, - ActivityType.LIKE: Like, - ActivityType.ANNOUNCE: Announce, - ActivityType.UPDATE: Update, - ActivityType.DELETE: Delete, - ActivityType.CREATE: Create, - ActivityType.NOTE: Note, - ActivityType.BLOCK: Block, - ActivityType.COLLECTION: Collection, - ActivityType.TOMBSTONE: Tombstone, -} - - -def parse_activity(payload: ObjectType, expected: Optional[ActivityType] = None) -> BaseActivity: - t = ActivityType(payload['type']) - - if expected and t != expected: - raise UnexpectedActivityTypeError(f'expected a {expected.name} activity, got a {payload["type"]}') - - if t not in _ACTIVITY_TYPE_TO_CLS: - raise BadActivityError(f'unsupported activity type {payload["type"]}') - - activity = _ACTIVITY_TYPE_TO_CLS[t](**payload) - - return activity - - def gen_feed(): fg = FeedGenerator() - fg.id(f'{ID}') - fg.title(f'{USERNAME} notes') - fg.author({'name': USERNAME, 'email': 't@a4.io'}) - fg.link(href=ID, rel='alternate') - fg.description(f'{USERNAME} notes') - fg.logo(ME.get('icon', {}).get('url')) - fg.language('en') - for item in DB.outbox.find({'type': 'Create'}, limit=50): + fg.id(f"{ID}") + fg.title(f"{USERNAME} notes") + fg.author({"name": USERNAME, "email": "t@a4.io"}) + fg.link(href=ID, rel="alternate") + fg.description(f"{USERNAME} notes") + fg.logo(ME.get("icon", {}).get("url")) + fg.language("en") + for item in DB.outbox.find({"type": "Create"}, limit=50): fe = fg.add_entry() - fe.id(item['activity']['object'].get('url')) - fe.link(href=item['activity']['object'].get('url')) - fe.title(item['activity']['object']['content']) - fe.description(item['activity']['object']['content']) + fe.id(item["activity"]["object"].get("url")) + fe.link(href=item["activity"]["object"].get("url")) + fe.title(item["activity"]["object"]["content"]) + fe.description(item["activity"]["object"]["content"]) return fg def json_feed(path: str) -> Dict[str, Any]: """JSON Feed (https://jsonfeed.org/) document.""" data = [] - for item in DB.outbox.find({'type': 'Create'}, limit=50): - data.append({ - "id": item["id"], - "url": item['activity']['object'].get('url'), - "content_html": item['activity']['object']['content'], - "content_text": html2text(item['activity']['object']['content']), - "date_published": item['activity']['object'].get('published'), - }) + for item in DB.outbox.find({"type": "Create"}, limit=50): + data.append( + { + "id": item["id"], + "url": item["activity"]["object"].get("url"), + "content_html": item["activity"]["object"]["content"], + "content_text": html2text(item["activity"]["object"]["content"]), + "date_published": item["activity"]["object"].get("published"), + } + ) return { "version": "https://jsonfeed.org/version/1", - "user_comment": ("This is a microblog feed. You can add this to your feed reader using the following URL: " - + ID + path), + "user_comment": ( + "This is a microblog feed. You can add this to your feed reader using the following URL: " + + ID + + path + ), "title": USERNAME, "home_page_url": ID, "feed_url": ID + path, "author": { "name": USERNAME, "url": ID, - "avatar": ME.get('icon', {}).get('url'), + "avatar": ME.get("icon", {}).get("url"), }, "items": data, } -def build_inbox_json_feed(path: str, request_cursor: Optional[str] = None) -> Dict[str, Any]: +def build_inbox_json_feed( + path: str, request_cursor: Optional[str] = None +) -> Dict[str, Any]: data = [] cursor = None - q: Dict[str, Any] = {'type': 'Create', 'meta.deleted': False} + q: Dict[str, Any] = {"type": "Create", "meta.deleted": False} if request_cursor: - q['_id'] = {'$lt': request_cursor} + q["_id"] = {"$lt": request_cursor} - for item in DB.inbox.find(q, limit=50).sort('_id', -1): - actor = ACTOR_SERVICE.get(item['activity']['actor']) - data.append({ - "id": item["activity"]["id"], - "url": item['activity']['object'].get('url'), - "content_html": item['activity']['object']['content'], - "content_text": html2text(item['activity']['object']['content']), - "date_published": item['activity']['object'].get('published'), - "author": { - "name": actor.get('name', actor.get('preferredUsername')), - "url": actor.get('url'), - 'avatar': actor.get('icon', {}).get('url'), - }, - }) - cursor = str(item['_id']) + for item in DB.inbox.find(q, limit=50).sort("_id", -1): + actor = ap.get_backend().fetch_iri(item["activity"]["actor"]) + data.append( + { + "id": item["activity"]["id"], + "url": item["activity"]["object"].get("url"), + "content_html": item["activity"]["object"]["content"], + "content_text": html2text(item["activity"]["object"]["content"]), + "date_published": item["activity"]["object"].get("published"), + "author": { + "name": actor.get("name", actor.get("preferredUsername")), + "url": actor.get("url"), + "avatar": actor.get("icon", {}).get("url"), + }, + } + ) + cursor = str(item["_id"]) resp = { "version": "https://jsonfeed.org/version/1", - "title": f'{USERNAME}\'s stream', + "title": f"{USERNAME}'s stream", "home_page_url": ID, "feed_url": ID + path, "items": data, } if cursor and len(data) == 50: - resp['next_url'] = ID + path + '?cursor=' + cursor + resp["next_url"] = ID + path + "?cursor=" + cursor return resp -def parse_collection(payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None) -> List[str]: +def parse_collection( + payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None +) -> List[str]: """Resolve/fetch a `Collection`/`OrderedCollection`.""" # Resolve internal collections via MongoDB directly - if url == ID + '/followers': - return [doc['remote_actor'] for doc in DB.followers.find()] - elif url == ID + '/following': - return [doc['remote_actor'] for doc in DB.following.find()] + if url == ID + "/followers": + return [doc["remote_actor"] for doc in DB.followers.find()] + elif url == ID + "/following": + return [doc["remote_actor"] for doc in DB.following.find()] # Go through all the pages - return activitypub_utils.parse_collection(payload, url) + return ap_parse_collection(payload, url) def embed_collection(total_items, first_page_id): return { - "type": ActivityType.ORDERED_COLLECTION.value, + "type": ap.ActivityType.ORDERED_COLLECTION.value, "totalItems": total_items, - "first": f'{first_page_id}?page=first', + "first": f"{first_page_id}?page=first", "id": first_page_id, } -def build_ordered_collection(col, q=None, cursor=None, map_func=None, limit=50, col_name=None, first_page=False): +def build_ordered_collection( + col, q=None, cursor=None, map_func=None, limit=50, col_name=None, first_page=False +): col_name = col_name or col.name if q is None: q = {} if cursor: - q['_id'] = {'$lt': ObjectId(cursor)} - data = list(col.find(q, limit=limit).sort('_id', -1)) + q["_id"] = {"$lt": ObjectId(cursor)} + data = list(col.find(q, limit=limit).sort("_id", -1)) if not data: return { - 'id': BASE_URL + '/' + col_name, - 'totalItems': 0, - 'type': ActivityType.ORDERED_COLLECTION.value, - 'orederedItems': [], + "id": BASE_URL + "/" + col_name, + "totalItems": 0, + "type": ap.ActivityType.ORDERED_COLLECTION.value, + "orederedItems": [], } - start_cursor = str(data[0]['_id']) - next_page_cursor = str(data[-1]['_id']) + start_cursor = str(data[0]["_id"]) + next_page_cursor = str(data[-1]["_id"]) total_items = col.find(q).count() data = [_remove_id(doc) for doc in data] @@ -1210,41 +374,43 @@ def build_ordered_collection(col, q=None, cursor=None, map_func=None, limit=50, # No cursor, this is the first page and we return an OrderedCollection if not cursor: resp = { - '@context': COLLECTION_CTX, - 'id': f'{BASE_URL}/{col_name}', - 'totalItems': total_items, - 'type': ActivityType.ORDERED_COLLECTION.value, - 'first': { - 'id': f'{BASE_URL}/{col_name}?cursor={start_cursor}', - 'orderedItems': data, - 'partOf': f'{BASE_URL}/{col_name}', - 'totalItems': total_items, - 'type': ActivityType.ORDERED_COLLECTION_PAGE.value, + "@context": ap.COLLECTION_CTX, + "id": f"{BASE_URL}/{col_name}", + "totalItems": total_items, + "type": ap.ActivityType.ORDERED_COLLECTION.value, + "first": { + "id": f"{BASE_URL}/{col_name}?cursor={start_cursor}", + "orderedItems": data, + "partOf": f"{BASE_URL}/{col_name}", + "totalItems": total_items, + "type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value, }, } if len(data) == limit: - resp['first']['next'] = BASE_URL + '/' + col_name + '?cursor=' + next_page_cursor + resp["first"]["next"] = ( + BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor + ) if first_page: - return resp['first'] + return resp["first"] return resp # If there's a cursor, then we return an OrderedCollectionPage resp = { - '@context': COLLECTION_CTX, - 'type': ActivityType.ORDERED_COLLECTION_PAGE.value, - 'id': BASE_URL + '/' + col_name + '?cursor=' + start_cursor, - 'totalItems': total_items, - 'partOf': BASE_URL + '/' + col_name, - 'orderedItems': data, + "@context": ap.COLLECTION_CTX, + "type": ap.ActivityType.ORDERED_COLLECTION_PAGE.value, + "id": BASE_URL + "/" + col_name + "?cursor=" + start_cursor, + "totalItems": total_items, + "partOf": BASE_URL + "/" + col_name, + "orderedItems": data, } if len(data) == limit: - resp['next'] = BASE_URL + '/' + col_name + '?cursor=' + next_page_cursor + resp["next"] = BASE_URL + "/" + col_name + "?cursor=" + next_page_cursor if first_page: - return resp['first'] + return resp["first"] # XXX(tsileo): implements prev with prev=? diff --git a/app.py b/app.py index e3cd805..6b104d2 100644 --- a/app.py +++ b/app.py @@ -67,7 +67,7 @@ from utils.errors import ActivityNotFoundError from typing import Dict, Any - + app = Flask(__name__) app.secret_key = get_secret_key('flask') app.config.update( @@ -137,23 +137,23 @@ def clean_html(html): return bleach.clean(html, tags=ALLOWED_TAGS) -@app.template_filter() -def quote_plus(t): - return urllib.parse.quote_plus(t) +@app.template_filter() +def quote_plus(t): + return urllib.parse.quote_plus(t) -@app.template_filter() -def is_from_outbox(t): +@app.template_filter() +def is_from_outbox(t): return t.startswith(ID) -@app.template_filter() -def clean(html): - return clean_html(html) +@app.template_filter() +def clean(html): + return clean_html(html) -@app.template_filter() -def html2plaintext(body): +@app.template_filter() +def html2plaintext(body): return html2text(body) @@ -183,7 +183,7 @@ def format_timeago(val): return timeago.format(datetime.strptime(val, '%Y-%m-%dT%H:%M:%SZ'), datetime.utcnow()) except: return timeago.format(datetime.strptime(val, '%Y-%m-%dT%H:%M:%S.%fZ'), datetime.utcnow()) - + return val def _is_img(filename): @@ -279,7 +279,7 @@ def handle_activitypub_error(error): return response -# App routes +# App routes ####### # Login @@ -487,7 +487,7 @@ def _build_thread(data, include_children=True): def _flatten(node, level=0): node['_level'] = level thread.append(node) - + for snode in sorted(idx[node['activity']['object']['id']]['_nodes'], key=lambda d: d['activity']['object']['published']): _flatten(snode, level=level+1) _flatten(idx[root_id]) @@ -495,10 +495,10 @@ def _build_thread(data, include_children=True): return thread -@app.route('/note/') -def note_by_id(note_id): +@app.route('/note/') +def note_by_id(note_id): data = DB.outbox.find_one({'id': note_id}) - if not data: + if not data: abort(404) if data['meta'].get('deleted', False): abort(410) @@ -511,7 +511,7 @@ def note_by_id(note_id): '$or': [{'activity.object.id': data['activity']['object']['id']}, {'activity.object': data['activity']['object']['id']}], })) - likes = [ACTOR_SERVICE.get(doc['activity']['actor']) for doc in likes] + likes = [ACTOR_SERVICE.get(doc['activity']['actor']) for doc in likes] shares = list(DB.inbox.find({ 'meta.undo': False, @@ -519,7 +519,7 @@ def note_by_id(note_id): '$or': [{'activity.object.id': data['activity']['object']['id']}, {'activity.object': data['activity']['object']['id']}], })) - shares = [ACTOR_SERVICE.get(doc['activity']['actor']) for doc in shares] + shares = [ACTOR_SERVICE.get(doc['activity']['actor']) for doc in shares] return render_template('note.html', likes=likes, shares=shares, me=ME, thread=thread, note=data) @@ -536,7 +536,7 @@ def nodeinfo(): 'openRegistrations': False, 'usage': {'users': {'total': 1}, 'localPosts': DB.outbox.count()}, 'metadata': { - 'sourceCode': 'https://github.com/tsileo/microblog.pub', + 'sourceCode': 'https://github.com/tsileo/microblog.pub', 'nodeName': f'@{USERNAME}@{DOMAIN}', }, }), @@ -551,7 +551,7 @@ def wellknown_nodeinfo(): 'rel': 'http://nodeinfo.diaspora.software/ns/schema/2.0', 'href': f'{ID}/nodeinfo', } - + ], ) @@ -616,11 +616,11 @@ def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, -@app.route('/outbox', methods=['GET', 'POST']) -def outbox(): - if request.method == 'GET': - if not is_api_request(): - abort(404) +@app.route('/outbox', methods=['GET', 'POST']) +def outbox(): + if request.method == 'GET': + if not is_api_request(): + abort(404) # TODO(tsileo): filter the outbox if not authenticated # FIXME(tsileo): filter deleted, add query support for build_ordered_collection q = { @@ -639,7 +639,7 @@ def outbox(): _api_required() except BadSignature: abort(401) - + data = request.get_json(force=True) print(data) activity = activitypub.parse_activity(data) @@ -785,7 +785,7 @@ def admin(): col_followers=DB.followers.count(), col_following=DB.following.count(), ) - + @app.route('/new', methods=['GET']) @login_required @@ -833,7 +833,7 @@ def notifications(): 'meta.deleted': False, } # TODO(tsileo): also include replies via regex on Create replyTo - q = {'$or': [q, {'type': 'Follow'}, {'type': 'Accept'}, {'type': 'Undo', 'activity.object.type': 'Follow'}, + q = {'$or': [q, {'type': 'Follow'}, {'type': 'Accept'}, {'type': 'Undo', 'activity.object.type': 'Follow'}, {'type': 'Announce', 'activity.object': {'$regex': f'^{BASE_URL}'}}, {'type': 'Create', 'activity.object.inReplyTo': {'$regex': f'^{BASE_URL}'}}, ]} @@ -1004,27 +1004,27 @@ def stream(): ) -@app.route('/inbox', methods=['GET', 'POST']) -def inbox(): - if request.method == 'GET': - if not is_api_request(): - abort(404) +@app.route('/inbox', methods=['GET', 'POST']) +def inbox(): + if request.method == 'GET': + if not is_api_request(): + abort(404) try: _api_required() except BadSignature: abort(404) - return jsonify(**activitypub.build_ordered_collection( - DB.inbox, - q={'meta.deleted': False}, - cursor=request.args.get('cursor'), - map_func=lambda doc: remove_context(doc['activity']), - )) + return jsonify(**activitypub.build_ordered_collection( + DB.inbox, + q={'meta.deleted': False}, + cursor=request.args.get('cursor'), + map_func=lambda doc: remove_context(doc['activity']), + )) - data = request.get_json(force=True) + data = request.get_json(force=True) logger.debug(f'req_headers={request.headers}') logger.debug(f'raw_data={data}') - try: + try: if not verify_request(ACTOR_SERVICE): raise Exception('failed to verify request') except Exception: @@ -1039,13 +1039,13 @@ def inbox(): response=json.dumps({'error': 'failed to verify request (using HTTP signatures or fetching the IRI)'}), ) - activity = activitypub.parse_activity(data) - logger.debug(f'inbox activity={activity}/{data}') - activity.process_from_inbox() + activity = activitypub.parse_activity(data) + logger.debug(f'inbox activity={activity}/{data}') + activity.process_from_inbox() - return Response( - status=201, - ) + return Response( + status=201, + ) @app.route('/api/debug', methods=['GET', 'DELETE']) @@ -1082,17 +1082,17 @@ def api_upload(): print('upload OK') print(filename) attachment = [ - {'mediaType': mtype, - 'name': rfilename, - 'type': 'Document', + {'mediaType': mtype, + 'name': rfilename, + 'type': 'Document', 'url': BASE_URL + f'/static/media/{filename}' }, ] print(attachment) - content = request.args.get('content') - to = request.args.get('to') - note = activitypub.Note( - cc=[ID+'/followers'], + content = request.args.get('content') + to = request.args.get('to') + note = activitypub.Note( + cc=[ID+'/followers'], to=[to if to else config.AS_PUBLIC], content=content, # TODO(tsileo): handle markdown attachment=attachment, @@ -1104,30 +1104,30 @@ def api_upload(): print(create.to_dict()) create.post_to_outbox() print('posted') - + return Response( status=201, response='OK', ) -@app.route('/api/new_note', methods=['POST']) -@api_required -def api_new_note(): +@app.route('/api/new_note', methods=['POST']) +@api_required +def api_new_note(): source = _user_api_arg('content') if not source: raise ValueError('missing content') - + _reply, reply = None, None try: _reply = _user_api_arg('reply') except ValueError: pass - content, tags = parse_markdown(source) + content, tags = parse_markdown(source) to = request.args.get('to') cc = [ID+'/followers'] - + if _reply: reply = activitypub.parse_activity(OBJECT_SERVICE.get(_reply)) cc.append(reply.attributedTo) @@ -1136,8 +1136,8 @@ def api_new_note(): if tag['type'] == 'Mention': cc.append(tag['href']) - note = activitypub.Note( - cc=list(set(cc)), + note = activitypub.Note( + cc=list(set(cc)), to=[to if to else config.AS_PUBLIC], content=content, tag=tags, @@ -1193,20 +1193,20 @@ def api_follow(): return _user_api_response(activity=follow.id) -@app.route('/followers') -def followers(): - if is_api_request(): +@app.route('/followers') +def followers(): + if is_api_request(): return jsonify( **activitypub.build_ordered_collection( DB.followers, cursor=request.args.get('cursor'), map_func=lambda doc: doc['remote_actor'], ) - ) + ) - followers = [ACTOR_SERVICE.get(doc['remote_actor']) for doc in DB.followers.find(limit=50)] - return render_template( - 'followers.html', + followers = [ACTOR_SERVICE.get(doc['remote_actor']) for doc in DB.followers.find(limit=50)] + return render_template( + 'followers.html', me=ME, notes=DB.inbox.find({'object.object.type': 'Note'}).count(), followers=DB.followers.count(), @@ -1225,7 +1225,7 @@ def following(): map_func=lambda doc: doc['remote_actor'], ), ) - + following = [ACTOR_SERVICE.get(doc['remote_actor']) for doc in DB.following.find(limit=50)] return render_template( 'following.html', @@ -1327,13 +1327,13 @@ def get_client_id_data(url): @app.route('/indieauth/flow', methods=['POST']) -@login_required -def indieauth_flow(): - auth = dict( - scope=' '.join(request.form.getlist('scopes')), - me=request.form.get('me'), - client_id=request.form.get('client_id'), - state=request.form.get('state'), +@login_required +def indieauth_flow(): + auth = dict( + scope=' '.join(request.form.getlist('scopes')), + me=request.form.get('me'), + client_id=request.form.get('client_id'), + state=request.form.get('state'), redirect_uri=request.form.get('redirect_uri'), response_type=request.form.get('response_type'), ) @@ -1354,14 +1354,14 @@ def indieauth_flow(): return redirect(red) -# @app.route('/indieauth', methods=['GET', 'POST']) -def indieauth_endpoint(): - if request.method == 'GET': - if not session.get('logged_in'): - return redirect(url_for('login', next=request.url)) +# @app.route('/indieauth', methods=['GET', 'POST']) +def indieauth_endpoint(): + if request.method == 'GET': + if not session.get('logged_in'): + return redirect(url_for('login', next=request.url)) - me = request.args.get('me') - # FIXME(tsileo): ensure me == ID + me = request.args.get('me') + # FIXME(tsileo): ensure me == ID client_id = request.args.get('client_id') redirect_uri = request.args.get('redirect_uri') state = request.args.get('state', '') @@ -1397,7 +1397,7 @@ def indieauth_endpoint(): abort(403) return - session['logged_in'] = True + session['logged_in'] = True me = auth['me'] state = auth['state'] scope = ' '.join(auth['scope']) diff --git a/config.py b/config.py index 506a4d3..9adbff9 100644 --- a/config.py +++ b/config.py @@ -6,10 +6,9 @@ import requests from itsdangerous import JSONWebSignatureSerializer from datetime import datetime -from utils import strtobool -from utils.key import Key, KEY_DIR, get_secret_key -from utils.actor_service import ActorService -from utils.object_service import ObjectService +from little_boxes.utils import strtobool +from utils.key import KEY_DIR, get_key, get_secret_key + def noop(): pass @@ -17,7 +16,7 @@ def noop(): CUSTOM_CACHE_HOOKS = False try: - from cache_hooks import purge as custom_cache_purge_hook + from cache_hooks import purge as custom_cache_purge_hook except ModuleNotFoundError: custom_cache_purge_hook = noop @@ -58,8 +57,6 @@ USER_AGENT = ( f'(microblog.pub/{VERSION}; +{BASE_URL})' ) -# TODO(tsileo): use 'mongo:27017; -# mongo_client = MongoClient(host=['mongo:27017']) mongo_client = MongoClient( host=[os.getenv('MICROBLOGPUB_MONGODB_HOST', 'localhost:27017')], ) @@ -67,23 +64,26 @@ mongo_client = MongoClient( DB_NAME = '{}_{}'.format(USERNAME, DOMAIN.replace('.', '_')) DB = mongo_client[DB_NAME] + def _drop_db(): if not DEBUG_MODE: return mongo_client.drop_database(DB_NAME) -KEY = Key(USERNAME, DOMAIN, create=True) + +KEY = get_key(ID, USERNAME, DOMAIN) JWT_SECRET = get_secret_key('jwt') JWT = JSONWebSignatureSerializer(JWT_SECRET) + def _admin_jwt_token() -> str: return JWT.dumps({'me': 'ADMIN', 'ts': datetime.now().timestamp()}).decode('utf-8') # type: ignore -ADMIN_API_KEY = get_secret_key('admin_api_key', _admin_jwt_token) +ADMIN_API_KEY = get_secret_key('admin_api_key', _admin_jwt_token) ME = { "@context": [ @@ -107,13 +107,5 @@ ME = { "type": "Image", "url": ICON_URL, }, - "publicKey": { - "id": ID+"#main-key", - "owner": ID, - "publicKeyPem": KEY.pubkey_pem, - }, + "publicKey": KEY.to_dict(), } -print(ME) - -ACTOR_SERVICE = ActorService(USER_AGENT, DB.actors_cache, ID, ME, DB.instances) -OBJECT_SERVICE = ObjectService(USER_AGENT, DB.objects_cache, DB.inbox, DB.outbox, DB.instances) diff --git a/little_boxes/README.md b/little_boxes/README.md deleted file mode 100644 index 024e073..0000000 --- a/little_boxes/README.md +++ /dev/null @@ -1,37 +0,0 @@ -# Little Boxes - -Tiny ActivityPub framework written in Python, both database and server agnostic. - -## Getting Started - -```python -from little_boxes import activitypub as ap - -from mydb import db_client - - -class MyBackend(BaseBackend): - - def __init__(self, db_connection): - self.db_connection = db_connection - - def inbox_new(self, as_actor, activity): - # Save activity as "as_actor" - # [...] - - def post_to_remote_inbox(self, as_actor, payload, recipient): - # Send the activity to the remote actor - # [...] - - -db_con = db_client() -my_backend = MyBackend(db_con) - -ap.use_backend(my_backend) - -me = ap.Person({}) # Init an actor -outbox = ap.Outbox(me) - -follow = ap.Follow(actor=me, object='http://iri-i-want-follow') -outbox.post(follow) -``` diff --git a/little_boxes/__init__.py b/little_boxes/__init__.py deleted file mode 100644 index c30c37d..0000000 --- a/little_boxes/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -import logging - -logger = logging.getLogger(__name__) - - -def strtobool(s: str) -> bool: - if s in ['y', 'yes', 'true', 'on', '1']: - return True - if s in ['n', 'no', 'false', 'off', '0']: - return False - - raise ValueError(f'cannot convert {s} to bool') diff --git a/little_boxes/activitypub.py b/little_boxes/activitypub.py deleted file mode 100644 index 781c71e..0000000 --- a/little_boxes/activitypub.py +++ /dev/null @@ -1,1073 +0,0 @@ -"""Core ActivityPub classes.""" -import logging -import json -import binascii -import os -from datetime import datetime -from enum import Enum - -from .errors import BadActivityError -from .errors import UnexpectedActivityTypeError -from .errors import NotFromOutboxError -from .errors import ActivityNotFoundError -from .urlutils import check_url -from .utils import parse_collection - -from typing import List -from typing import Optional -from typing import Dict -from typing import Any -from typing import Union -from typing import Type - -import requests - - -logger = logging.getLogger(__name__) - -# Helper/shortcut for typing -ObjectType = Dict[str, Any] -ObjectOrIDType = Union[str, ObjectType] - -CTX_AS = 'https://www.w3.org/ns/activitystreams' -CTX_SECURITY = 'https://w3id.org/security/v1' -AS_PUBLIC = 'https://www.w3.org/ns/activitystreams#Public' - -COLLECTION_CTX = [ - "https://www.w3.org/ns/activitystreams", - "https://w3id.org/security/v1", - { - "Hashtag": "as:Hashtag", - "sensitive": "as:sensitive", - } -] - -# Will be used to keep track of all the defined activities -_ACTIVITY_CLS: Dict['ActivityTypeEnum', Type['_BaseActivity']] = {} - -BACKEND = None - -def use_backend(backend_instance): - global BACKEND - BACKEND = backend_instance - - - -class DefaultRemoteObjectFetcher(object): - """Not meant to be used on production, a caching layer, and DB shortcut fox inbox/outbox should be hooked.""" - - def __init__(self): - self._user_agent = 'Little Boxes (+https://github.com/tsileo/little_boxes)' - - def fetch(self, iri): - print('OLD FETCHER') - check_url(iri) - - resp = requests.get(iri, headers={ - 'Accept': 'application/activity+json', - 'User-Agent': self._user_agent, - }) - - if resp.status_code == 404: - raise ActivityNotFoundError(f'{iri} cannot be fetched, 404 not found error') - - resp.raise_for_status() - - return resp.json() - - -class ActivityType(Enum): - """Supported activity `type`.""" - ANNOUNCE = 'Announce' - BLOCK = 'Block' - LIKE = 'Like' - CREATE = 'Create' - UPDATE = 'Update' - PERSON = 'Person' - ORDERED_COLLECTION = 'OrderedCollection' - ORDERED_COLLECTION_PAGE = 'OrderedCollectionPage' - COLLECTION_PAGE = 'CollectionPage' - COLLECTION = 'Collection' - NOTE = 'Note' - ACCEPT = 'Accept' - REJECT = 'Reject' - FOLLOW = 'Follow' - DELETE = 'Delete' - UNDO = 'Undo' - IMAGE = 'Image' - TOMBSTONE = 'Tombstone' - - -def parse_activity(payload: ObjectType, expected: Optional[ActivityType] = None) -> 'BaseActivity': - t = ActivityType(payload['type']) - - if expected and t != expected: - raise UnexpectedActivityTypeError(f'expected a {expected.name} activity, got a {payload["type"]}') - - if t not in _ACTIVITY_CLS: - raise BadActivityError(f'unsupported activity type {payload["type"]}') - - activity = _ACTIVITY_CLS[t](**payload) - - return activity - - -def random_object_id() -> str: - """Generates a random object ID.""" - return binascii.hexlify(os.urandom(8)).decode('utf-8') - - -def _to_list(data: Union[List[Any], Any]) -> List[Any]: - """Helper to convert fields that can be either an object or a list of objects to a list of object.""" - if isinstance(data, list): - return data - return [data] - - -def clean_activity(activity: ObjectType) -> Dict[str, Any]: - """Clean the activity before rendering it. - - Remove the hidden bco and bcc field - """ - for field in ['bto', 'bcc']: - if field in activity: - del(activity[field]) - if activity['type'] == 'Create' and field in activity['object']: - del(activity['object'][field]) - return activity - - -def _get_actor_id(actor: ObjectOrIDType) -> str: - """Helper for retrieving an actor `id`.""" - if isinstance(actor, dict): - return actor['id'] - return actor - - -class BaseBackend(object): - """In-memory backend meant to be used for the test suite.""" - DB = {} - USERS = {} - FETCH_MOCK = {} - INBOX_IDX = {} - OUTBOX_IDX = {} - FOLLOWERS = {} - FOLLOWING = {} - - def setup_actor(self, name, pusername): - """Create a new actor in this backend.""" - p = Person( - name=name, - preferredUsername=pusername, - summary='Hello', - id=f'https://lol.com/{pusername}', - inbox=f'https://lol.com/{pusername}/inbox', - ) - - self.USERS[p.preferredUsername] = p - self.DB[p.id] = { - 'inbox': [], - 'outbox': [], - } - self.INBOX_IDX[p.id] = {} - self.OUTBOX_IDX[p.id] = {} - self.FOLLOWERS[p.id] = [] - self.FOLLOWING[p.id] = [] - self.FETCH_MOCK[p.id] = p.to_dict() - return p - - def fetch_iri(self, iri: str): - return self.FETCH_MOCK[iri] - - def get_user(self, username: str) -> 'Person': - if username in self.USERS: - return self.USERS[username] - else: - raise ValueError(f'bad username {username}') - - def outbox_is_blocked(self, as_actor: 'Person', actor_id: str) -> bool: - """Returns True if `as_actor` has blocked `actor_id`.""" - for activity in self.DB[as_actor.id]['outbox']: - if activity.ACTIVITY_TYPE == ActivityType.BLOCK: - return True - return False - - def inbox_get_by_iri(self, as_actor: 'Person', iri: str) -> Optional['BaseActivity']: - for activity in self.DB[as_actor.id]['inbox']: - if activity.id == iri: - return activity - - return None - - def inbox_new(self, as_actor: 'Person', activity: 'BaseActivity') -> None: - if activity.id in self.INBOX_IDX[as_actor.id]: - return - self.DB[as_actor.id]['inbox'].append(activity) - self.INBOX_IDX[as_actor.id][activity.id] = activity - - def activity_url(self, obj_id: str) -> str: - # from the random hex ID - return 'TODO' - - def outbox_new(self, activity: 'BaseActivity') -> None: - print(f'saving {activity!r} to DB') - actor_id = activity.get_actor().id - if activity.id in self.OUTBOX_IDX[actor_id]: - return - self.DB[actor_id]['outbox'].append(activity) - self.OUTBOX_IDX[actor_id][activity.id] = activity - - def new_follower(self, as_actor: 'Person', actor: 'Person') -> None: - self.FOLLOWERS[as_actor.id].append(actor.id) - - def undo_new_follower(self, actor: 'Person') -> None: - pass - - def new_following(self, as_actor: 'Person', actor: 'Person') -> None: - print(f'new following {actor!r}') - self.FOLLOWING[as_actor.id].append(actor.id) - - def undo_new_following(self, actor: 'Person') -> None: - pass - - def followers(self, as_actor: 'Person') -> List[str]: - return self.FOLLOWERS[as_actor.id] - - def following(self, as_actor: 'Person') -> List[str]: - return self.FOLLOWING[as_actor.id] - - def post_to_remote_inbox(self, payload_encoded: str, recp: str) -> None: - payload = json.loads(payload_encoded) - print(f'post_to_remote_inbox {payload} {recp}') - act = parse_activity(payload) - as_actor = parse_activity(self.fetch_iri(recp.replace('/inbox', ''))) - act.process_from_inbox(as_actor) - - def is_from_outbox(self, activity: 'BaseActivity') -> None: - pass - - def inbox_like(self, activity: 'Like') -> None: - pass - - def inbox_undo_like(self, activity: 'Like') -> None: - pass - - def outbox_like(self, activity: 'Like') -> None: - pass - - def outbox_undo_like(self, activity: 'Lke') -> None: - pass - - def inbox_announce(self, activity: 'Announce') -> None: - pass - - def inbox_undo_announce(self, activity: 'Announce') -> None: - pass - - def outbox_announce(self, activity: 'Announce') -> None: - pass - - def outbox_undo_announce(self, activity: 'Announce') -> None: - pass - - def inbox_delete(self, activity: 'Delete') -> None: - pass - - def outbox_delete(self, activity: 'Delete') -> None: - pass - - def inbox_update(self, activity: 'Update') -> None: - pass - - def outbox_update(self, activity: 'Update') -> None: - pass - - def inbox_create(self, activity: 'Create') -> None: - pass - - def outbox_create(self, activity: 'Create') -> None: - pass - - -class _ActivityMeta(type): - """Metaclass for keeping track of subclass.""" - def __new__(meta, name, bases, class_dict): - cls = type.__new__(meta, name, bases, class_dict) - - # Ensure the class has an activity type defined - if name != 'BaseActivity' and not cls.ACTIVITY_TYPE: - raise ValueError(f'class {name} has no ACTIVITY_TYPE') - - # Register it - _ACTIVITY_CLS[cls.ACTIVITY_TYPE] = cls - return cls - - -class BaseActivity(object, metaclass=_ActivityMeta): - """Base class for ActivityPub activities.""" - - ACTIVITY_TYPE: Optional[ActivityType] = None # the ActivityTypeEnum the class will represent - OBJECT_REQUIRED = False # Whether the object field is required or note - ALLOWED_OBJECT_TYPES: List[ActivityType] = [] - ACTOR_REQUIRED = True # Most of the object requires an actor, so this flag in on by default - - def __init__(self, **kwargs) -> None: - if kwargs.get('type') and kwargs.pop('type') != self.ACTIVITY_TYPE.value: - raise UnexpectedActivityTypeError(f'Expect the type to be {self.ACTIVITY_TYPE.value!r}') - - # Initialize the dict that will contains all the activity fields - self._data: Dict[str, Any] = { - 'type': self.ACTIVITY_TYPE.value - } - logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity: {kwargs!r}') - - # A place to set ephemeral data - self.__ctx = {} - - # The id may not be present for new activities - if 'id' in kwargs: - self._data['id'] = kwargs.pop('id') - - if self.ACTIVITY_TYPE != ActivityType.PERSON and self.ACTOR_REQUIRED: - actor = kwargs.get('actor') - if actor: - kwargs.pop('actor') - actor = self._validate_person(actor) - self._data['actor'] = actor - else: - raise BadActivityError('missing actor') - - if self.OBJECT_REQUIRED and 'object' in kwargs: - obj = kwargs.pop('object') - if isinstance(obj, str): - # The object is a just a reference the its ID/IRI - # FIXME(tsileo): fetch the ref - self._data['object'] = obj - else: - if not self.ALLOWED_OBJECT_TYPES: - raise UnexpectedActivityTypeError('unexpected object') - if 'type' not in obj or (self.ACTIVITY_TYPE != ActivityType.CREATE and 'id' not in obj): - raise BadActivityError('invalid object, missing type') - if ActivityType(obj['type']) not in self.ALLOWED_OBJECT_TYPES: - raise UnexpectedActivityTypeError( - f'unexpected object type {obj["type"]} (allowed={self.ALLOWED_OBJECT_TYPES!r})' - ) - self._data['object'] = obj - - if '@context' not in kwargs: - self._data['@context'] = CTX_AS - else: - self._data['@context'] = kwargs.pop('@context') - - # @context check - if not isinstance(self._data['@context'], list): - self._data['@context'] = [self._data['@context']] - if CTX_SECURITY not in self._data['@context']: - self._data['@context'].append(CTX_SECURITY) - if isinstance(self._data['@context'][-1], dict): - self._data['@context'][-1]['Hashtag'] = 'as:Hashtag' - self._data['@context'][-1]['sensitive'] = 'as:sensitive' - else: - self._data['@context'].append({'Hashtag': 'as:Hashtag', 'sensitive': 'as:sensitive'}) - - # FIXME(tsileo): keys required for some subclasses? - allowed_keys = None - try: - allowed_keys = self._init(**kwargs) - logger.debug('calling custom init') - except NotImplementedError: - pass - - if allowed_keys: - # Allows an extra to (like for Accept and Follow) - kwargs.pop('to', None) - if len(set(kwargs.keys()) - set(allowed_keys)) > 0: - raise BadActivityError(f'extra data left: {kwargs!r}') - else: - # Remove keys with `None` value - valid_kwargs = {} - for k, v in kwargs.items(): - if v is None: - continue - valid_kwargs[k] = v - self._data.update(**valid_kwargs) - - def ctx(self) -> Dict[str, Any]: - return self.__ctx - - def set_ctx(self, ctx: Dict[str, Any]) -> None: - self.__ctx = ctx - - def _init(self, **kwargs) -> Optional[List[str]]: - """Optional init callback that may returns a list of allowed keys.""" - raise NotImplementedError - - def __repr__(self) -> str: - """Pretty repr.""" - return '{}({!r})'.format(self.__class__.__qualname__, self._data.get('id')) - - def __str__(self) -> str: - """Returns the ID/IRI when castign to str.""" - return str(self._data.get('id', f'[new {self.ACTIVITY_TYPE} activity]')) - - def __getattr__(self, name: str) -> Any: - """Allow to access the object field as regular attributes.""" - if self._data.get(name): - return self._data.get(name) - - def _outbox_set_id(self, uri: str, obj_id: str) -> None: - """Optional callback for subclasses to so something with a newly generated ID (for outbox activities).""" - raise NotImplementedError - - def outbox_set_id(self, uri: str, obj_id: str) -> None: - """Set the ID for a new activity.""" - logger.debug(f'setting ID {uri} / {obj_id}') - self._data['id'] = uri - try: - self._outbox_set_id(uri, obj_id) - except NotImplementedError: - pass - - def _actor_id(self, obj: ObjectOrIDType) -> str: - if isinstance(obj, dict) and obj['type'] == ActivityType.PERSON.value: - obj_id = obj.get('id') - if not obj_id: - raise BadActivityError(f'missing object id: {obj!r}') - return obj_id - elif isinstance(obj, str): - return obj - else: - raise BadActivityError(f'invalid "actor" field: {obj!r}') - - def _validate_person(self, obj: ObjectOrIDType) -> str: - obj_id = self._actor_id(obj) - try: - actor = BACKEND.fetch_iri(obj_id) - except Exception: - raise BadActivityError(f'failed to validate actor {obj!r}') - - if not actor or 'id' not in actor: - raise BadActivityError(f'invalid actor {actor}') - - return actor['id'] - - def get_object(self) -> 'BaseActivity': - """Returns the object as a BaseActivity instance.""" - if self.__obj: - return self.__obj - if isinstance(self._data['object'], dict): - p = parse_activity(self._data['object']) - else: - obj = BACKEND.fetch_iri(self._data['object']) - if ActivityType(obj.get('type')) not in self.ALLOWED_OBJECT_TYPES: - raise UnexpectedActivityTypeError(f'invalid object type {obj.get("type")!r}') - p = parse_activity(obj) - - self.__obj: Optional['BaseActivity'] = p - return p - - def reset_object_cache(self) -> None: - self.__obj = None - - def to_dict(self, embed: bool = False, embed_object_id_only: bool = False) -> ObjectType: - """Serializes the activity back to a dict, ready to be JSON serialized.""" - data = dict(self._data) - if embed: - for k in ['@context', 'signature']: - if k in data: - del(data[k]) - if data.get('object') and embed_object_id_only and isinstance(data['object'], dict): - try: - data['object'] = data['object']['id'] - except KeyError: - raise BadActivityError(f'embedded object {data["object"]!r} should have an id') - - return data - - def get_actor(self) -> 'BaseActivity': - # FIXME(tsileo): cache the actor (same way as get_object) - actor = self._data.get('actor') - if not actor and self.ACTOR_REQUIRED: - # Quick hack for Note objects - if self.ACTIVITY_TYPE == ActivityType.NOTE: - actor = str(self._data.get('attributedTo')) - else: - raise BadActivityError(f'failed to fetch actor: {self._data!r}') - - actor_id = self._actor_id(actor) - return Person(**BACKEND.fetch_iri(actor_id)) - - def _pre_post_to_outbox(self) -> None: - raise NotImplementedError - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - raise NotImplementedError - - def _undo_outbox(self) -> None: - raise NotImplementedError - - def _pre_process_from_inbox(self, as_actor: 'Person') -> None: - raise NotImplementedError - - def _process_from_inbox(self, as_actor: 'Person') -> None: - raise NotImplementedError - - def _undo_inbox(self) -> None: - raise NotImplementedError - - def process_from_inbox(self, as_actor: 'Person') -> None: - """Process the message posted to `as_actor` inbox.""" - logger.debug(f'calling main process from inbox hook for {self}') - actor = self.get_actor() - - # Check for Block activity - if BACKEND.outbox_is_blocked(as_actor, actor.id): - # TODO(tsileo): raise ActorBlockedError? - logger.info(f'actor {actor!r} is blocked, dropping the received activity {self!r}') - return - - if BACKEND.inbox_get_by_iri(as_actor, self.id): - # The activity is already in the inbox - logger.info(f'received duplicate activity {self}, dropping it') - return - - try: - self._pre_process_from_inbox(as_actor) - logger.debug('called pre process from inbox hook') - except NotImplementedError: - logger.debug('pre process from inbox hook not implemented') - - BACKEND.inbox_new(as_actor, self) - logger.info('activity {self!r} saved') - - try: - self._process_from_inbox(as_actor) - logger.debug('called process from inbox hook') - except NotImplementedError: - logger.debug('process from inbox hook not implemented') - - def post_to_outbox(self) -> None: - logger.debug(f'calling main post to outbox hook for {self}') - - # Assign create a random ID - obj_id = random_object_id() - self.outbox_set_id(BACKEND.activity_url(obj_id), obj_id) - - try: - self._pre_post_to_outbox() - logger.debug(f'called pre post to outbox hook') - except NotImplementedError: - logger.debug('pre post to outbox hook not implemented') - - BACKEND.outbox_new(self) - - recipients = self.recipients() - logger.info(f'recipients={recipients}') - activity = clean_activity(self.to_dict()) - - try: - self._post_to_outbox(obj_id, activity, recipients) - logger.debug(f'called post to outbox hook') - except NotImplementedError: - logger.debug('post to outbox hook not implemented') - - payload = json.dumps(activity) - for recp in recipients: - logger.debug(f'posting to {recp}') - - BACKEND.post_to_remote_inbox(payload, recp) - - def _recipients(self) -> List[str]: - return [] - - def recipients(self) -> List[str]: - recipients = self._recipients() - actor_id = self.get_actor().id - - out: List[str] = [] - for recipient in recipients: - # if recipient in PUBLIC_INSTANCES: - # if recipient not in out: - # out.append(str(recipient)) - # continue - if recipient in [actor_id, AS_PUBLIC, None]: - continue - if isinstance(recipient, Person): - if recipient.id == actor_id: - continue - actor = recipient - else: - raw_actor = BACKEND.fetch_iri(recipient) - if raw_actor['type'] == ActivityType.PERSON.value: - actor = Person(**raw_actor) - - if actor.endpoints: - shared_inbox = actor.endpoints.get('sharedInbox') - if shared_inbox not in out: - out.append(shared_inbox) - continue - - if actor.inbox and actor.inbox not in out: - out.append(actor.inbox) - - # Is the activity a `Collection`/`OrderedCollection`? - elif raw_actor['type'] in [ActivityType.COLLECTION.value, - ActivityType.ORDERED_COLLECTION.value]: - for item in parse_collection(raw_actor): - if item in [actor_id, AS_PUBLIC]: - continue - try: - col_actor = Person(**BACKEND.fetch_iri(item)) - except UnexpectedActivityTypeError: - logger.exception(f'failed to fetch actor {item!r}') - - if col_actor.endpoints: - shared_inbox = col_actor.endpoints.get('sharedInbox') - if shared_inbox not in out: - out.append(shared_inbox) - continue - if col_actor.inbox and col_actor.inbox not in out: - out.append(col_actor.inbox) - else: - raise BadActivityError(f'failed to parse {raw_actor!r}') - - return out - - def build_undo(self) -> 'BaseActivity': - raise NotImplementedError - - def build_delete(self) -> 'BaseActivity': - raise NotImplementedError - - -class Person(BaseActivity): - ACTIVITY_TYPE = ActivityType.PERSON - OBJECT_REQUIRED = False - ACTOR_REQUIRED = False - - -class Block(BaseActivity): - ACTIVITY_TYPE = ActivityType.BLOCK - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - -class Collection(BaseActivity): - ACTIVITY_TYPE = ActivityType.COLLECTION - OBJECT_REQUIRED = False - ACTOR_REQUIRED = False - - -class Image(BaseActivity): - ACTIVITY_TYPE = ActivityType.IMAGE - OBJECT_REQUIRED = False - ACTOR_REQUIRED = False - - def _init(self, **kwargs): - self._data.update( - url=kwargs.pop('url'), - ) - - def __repr__(self): - return 'Image({!r})'.format(self._data.get('url')) - - -class Follow(BaseActivity): - ACTIVITY_TYPE = ActivityType.FOLLOW - ALLOWED_OBJECT_TYPES = [ActivityType.PERSON] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _build_reply(self, reply_type: ActivityType) -> BaseActivity: - if reply_type == ActivityType.ACCEPT: - return Accept( - actor=self.get_object().id, - object=self.to_dict(embed=True), - ) - - raise ValueError(f'type {reply_type} is invalid for building a reply') - - def _recipients(self) -> List[str]: - return [self.get_object().id] - - def _process_from_inbox(self, as_actor: 'Person') -> None: - """Receiving a Follow should trigger an Accept.""" - accept = self.build_accept() - accept.post_to_outbox() - - remote_actor = self.get_actor() - - # ABC - BACKEND.new_follower(as_actor, remote_actor) - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - # XXX The new_following event will be triggered by Accept - pass - - def _undo_inbox(self) -> None: - # ABC - self.undo_new_follower(self.get_actor()) - - def _undo_outbox(self) -> None: - # ABC - self.undo_new_following(self.get_actor()) - - def build_accept(self) -> BaseActivity: - return self._build_reply(ActivityType.ACCEPT) - - def build_undo(self) -> BaseActivity: - return Undo(object=self.to_dict(embed=True)) - - -class Accept(BaseActivity): - ACTIVITY_TYPE = ActivityType.ACCEPT - ALLOWED_OBJECT_TYPES = [ActivityType.FOLLOW] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _recipients(self) -> List[str]: - return [self.get_object().get_actor().id] - - def _pre_process_from_inbox(self, as_actor: 'Person') -> None: - # FIXME(tsileo): ensure the actor match the object actor - pass - - def _process_from_inbox(self, as_actor: 'Person') -> None: - BACKEND.new_following(as_actor, self.get_actor()) - - -class Undo(BaseActivity): - ACTIVITY_TYPE = ActivityType.UNDO - ALLOWED_OBJECT_TYPES = [ActivityType.FOLLOW, ActivityType.LIKE, ActivityType.ANNOUNCE] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _recipients(self) -> List[str]: - obj = self.get_object() - if obj.type_enum == ActivityType.FOLLOW: - return [obj.get_object().id] - else: - return [obj.get_object().get_actor().id] - # TODO(tsileo): handle like and announce - raise Exception('TODO') - - def _pre_process_from_inbox(self, as_actor: 'Person') -> None: - """Ensures an Undo activity comes from the same actor as the updated activity.""" - obj = self.get_object() - actor = self.get_actor() - if actor.id != obj.get_actor().id: - raise BadActivityError(f'{actor!r} cannot update {obj!r}') - - def _process_from_inbox(self, as_actor: 'Person') -> None: - obj = self.get_object() - # FIXME(tsileo): move this to _undo_inbox impl - # DB.inbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}}) - - try: - obj._undo_inbox() - except NotImplementedError: - pass - - def _pre_post_to_outbox(self) -> None: - """Ensures an Undo activity references an activity owned by the instance.""" - # ABC - if not self.is_from_outbox(self): - raise NotFromOutboxError(f'object {self!r} is not owned by this instance') - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - logger.debug('processing undo to outbox') - logger.debug('self={}'.format(self)) - obj = self.get_object() - logger.debug('obj={}'.format(obj)) - - # FIXME(tsileo): move this to _undo_inbox impl - # DB.outbox.update_one({'remote_id': obj.id}, {'$set': {'meta.undo': True}}) - - try: - obj._undo_outbox() - logger.debug(f'_undo_outbox called for {obj}') - except NotImplementedError: - logger.debug(f'_undo_outbox not implemented for {obj}') - pass - - -class Like(BaseActivity): - ACTIVITY_TYPE = ActivityType.LIKE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _recipients(self) -> List[str]: - return [self.get_object().get_actor().id] - - def _process_from_inbox(self, as_actor: 'Person') -> None: - # ABC - self.inbox_like(self) - - def _undo_inbox(self) -> None: - # ABC - self.inbox_undo_like(self) - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]): - # ABC - self.outbox_like(self) - - def _undo_outbox(self) -> None: - # ABC - self.outbox_undo_like(self) - - def build_undo(self) -> BaseActivity: - return Undo( - object=self.to_dict(embed=True, embed_object_id_only=True), - actor=self.get_actor().id, - ) - - -class Announce(BaseActivity): - ACTIVITY_TYPE = ActivityType.ANNOUNCE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _recipients(self) -> List[str]: - recipients = [] - - for field in ['to', 'cc']: - if field in self._data: - recipients.extend(_to_list(self._data[field])) - - return recipients - - def _process_from_inbox(self, as_actor: 'Person') -> None: - if isinstance(self._data['object'], str) and not self._data['object'].startswith('http'): - # TODO(tsileo): actually drop it without storing it and better logging, also move the check somewhere else - logger.warn( - f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message' - ) - return - - # ABC - self.inbox_announce(self) - - def _undo_inbox(self) -> None: - # ABC - self.inbox_undo_annnounce(self) - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - # ABC - self.outbox_announce(self) - - def _undo_outbox(self) -> None: - # ABC - self.outbox_undo_announce(self) - - def build_undo(self) -> BaseActivity: - return Undo(object=self.to_dict(embed=True)) - - -class Delete(BaseActivity): - ACTIVITY_TYPE = ActivityType.DELETE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.TOMBSTONE] - OBJECT_REQUIRED = True - - def _get_actual_object(self) -> BaseActivity: - # FIXME(tsileo): overrides get_object instead? - obj = self.get_object() - if obj.type_enum == ActivityType.TOMBSTONE: - obj = parse_activity(BACKEND.fetch_iri(obj.id)) - return obj - - def _recipients(self) -> List[str]: - obj = self._get_actual_object() - return obj._recipients() - - def _pre_process_from_inbox(self, as_actor: 'Person') -> None: - """Ensures a Delete activity comes from the same actor as the deleted activity.""" - obj = self._get_actual_object() - actor = self.get_actor() - if actor.id != obj.get_actor().id: - raise BadActivityError(f'{actor!r} cannot delete {obj!r}') - - def _process_from_inbox(self, as_actor: 'Person') -> None: - # ABC - self.inbox_delete(self) - # FIXME(tsileo): handle the delete_threads here? - - def _pre_post_to_outbox(self) -> None: - """Ensures the Delete activity references a activity from the outbox (i.e. owned by the instance).""" - obj = self._get_actual_object() - # ABC - if not self.is_from_outbox(self): - raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - # ABC - self.outbox_delete(self) - - -class Update(BaseActivity): - ACTIVITY_TYPE = ActivityType.UPDATE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.PERSON] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _pre_process_from_inbox(self, as_actor: 'Person') -> None: - """Ensures an Update activity comes from the same actor as the updated activity.""" - obj = self.get_object() - actor = self.get_actor() - if actor.id != obj.get_actor().id: - raise BadActivityError(f'{actor!r} cannot update {obj!r}') - - def _process_from_inbox(self, as_actor: 'Person') -> None: - # ABC - self.inbox_update(self) - - def _pre_post_to_outbox(self) -> None: - # ABC - if not self.is_form_outbox(self): - raise NotFromOutboxError(f'object {self!r} is not owned by this instance') - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - # ABC - self.outbox_update(self) - - -class Create(BaseActivity): - ACTIVITY_TYPE = ActivityType.CREATE - ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] - OBJECT_REQUIRED = True - ACTOR_REQUIRED = True - - def _set_id(self, uri: str, obj_id: str) -> None: - self._data['object']['id'] = uri + '/activity' - # ABC - self._data['object']['url'] = self.note_url(self) - self.reset_object_cache() - - def _init(self, **kwargs): - obj = self.get_object() - if not obj.attributedTo: - self._data['object']['attributedTo'] = self.get_actor().id - if not obj.published: - if self.published: - self._data['object']['published'] = self.published - else: - now = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z' - self._data['published'] = now - self._data['object']['published'] = now - - def _recipients(self) -> List[str]: - # TODO(tsileo): audience support? - recipients = [] - for field in ['to', 'cc', 'bto', 'bcc']: - if field in self._data: - recipients.extend(_to_list(self._data[field])) - - recipients.extend(self.get_object()._recipients()) - - return recipients - - def _process_from_inbox(self, as_actor: 'Person') -> None: - # ABC - self.inbox_create(self) - - def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: - # ABC - self.outbox_create(self) - - -class Tombstone(BaseActivity): - ACTIVITY_TYPE = ActivityType.TOMBSTONE - ACTOR_REQUIRED = False - OBJECT_REQUIRED = False - - -class Note(BaseActivity): - ACTIVITY_TYPE = ActivityType.NOTE - ACTOR_REQUIRED = True - OBJECT_REQURIED = False - - def _init(self, **kwargs): - print(self._data) - # Remove the `actor` field as `attributedTo` is used for `Note` instead - if 'actor' in self._data: - del(self._data['actor']) - if 'sensitive' not in kwargs: - self._data['sensitive'] = False - - def _recipients(self) -> List[str]: - # TODO(tsileo): audience support? - recipients: List[str] = [] - - # FIXME(tsileo): re-add support for the PUBLIC_INSTANCES - # If the note is public, we publish it to the defined "public instances" - # if AS_PUBLIC in self._data.get('to', []): - # recipients.extend(PUBLIC_INSTANCES) - # print('publishing to public instances') - # print(recipients) - - for field in ['to', 'cc', 'bto', 'bcc']: - if field in self._data: - recipients.extend(_to_list(self._data[field])) - - return recipients - - def build_create(self) -> BaseActivity: - """Wraps an activity in a Create activity.""" - create_payload = { - 'object': self.to_dict(embed=True), - 'actor': self.attributedTo, - } - for field in ['published', 'to', 'bto', 'cc', 'bcc', 'audience']: - if field in self._data: - create_payload[field] = self._data[field] - - return Create(**create_payload) - - def build_like(self) -> BaseActivity: - return Like(object=self.id) - - def build_announce(self) -> BaseActivity: - return Announce( - object=self.id, - to=[AS_PUBLIC], - cc=[self.follower_collection_id(self.get_actor()), self.attributedTo], # ABC - published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z', - ) - - def build_delete(self) -> BaseActivity: - return Delete(object=Tombstone(id=self.id).to_dict(embed=True)) - - def get_tombstone(self, deleted: Optional[str]) -> BaseActivity: - return Tombstone( - id=self.id, - published=self.published, - deleted=deleted, - updated=deleted, - ) - - -class Box(object): - def __init__(self, actor: Person): - self.actor = actor - - -class Outbox(Box): - - def post(self, activity: BaseActivity) -> None: - if activity.get_actor().id != self.actor.id: - raise ValueError(f'{activity.get_actor()!r} cannot post into {self.actor!r} outbox') - - activity.post_to_outbox() - - def get(self, activity_iri: str) -> BaseActivity: - pass - - def collection(self): - # TODO(tsileo): figure out an API - pass - - -class Inbox(Box): - - def post(self, activity: BaseActivity) -> None: - activity.process_from_inbox(self.actor) diff --git a/little_boxes/errors.py b/little_boxes/errors.py deleted file mode 100644 index cb3cb34..0000000 --- a/little_boxes/errors.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Errors raised by this package.""" -from typing import Optional -from typing import Dict -from typing import Any - - -class Error(Exception): - """HTTP-friendly base error, with a status code, a message and an optional payload.""" - status_code = 400 - - def __init__( - self, message: str, - status_code: Optional[int] = None, - payload: Optional[Dict[str, Any]] = None, - ) -> None: - Exception.__init__(self) - self.message = message - if status_code is not None: - self.status_code = status_code - self.payload = payload - - def to_dict(self) -> Dict[str, Any]: - rv = dict(self.payload or ()) - rv['message'] = self.message - return rv - - def __repr__(self) -> str: - return ( - f'{self.__class__.__qualname__}({self.message!r}, payload={self.payload!r}, status_code={self.status_code})' - ) - - -class ActorBlockedError(Error): - """Raised when an activity from a blocked actor is received.""" - - -class NotFromOutboxError(Error): - """Raised when an activity targets an object from the inbox when an object from the oubox was expected.""" - - -class ActivityNotFoundError(Error): - """Raised when an activity is not found.""" - status_code = 404 - - -class BadActivityError(Error): - """Raised when an activity could not be parsed/initialized.""" - - -class RecursionLimitExceededError(BadActivityError): - """Raised when the recursion limit for fetching remote object was exceeded (likely a collection).""" - - -class UnexpectedActivityTypeError(BadActivityError): - """Raised when an another activty was expected.""" diff --git a/little_boxes/urlutils.py b/little_boxes/urlutils.py deleted file mode 100644 index 99f900d..0000000 --- a/little_boxes/urlutils.py +++ /dev/null @@ -1,47 +0,0 @@ -import logging -import os -import socket -import ipaddress -from urllib.parse import urlparse - -from . import strtobool -from .errors import Error - -logger = logging.getLogger(__name__) - - -class InvalidURLError(Error): - pass - - -def is_url_valid(url: str) -> bool: - parsed = urlparse(url) - if parsed.scheme not in ['http', 'https']: - return False - - # XXX in debug mode, we want to allow requests to localhost to test the federation with local instances - debug_mode = strtobool(os.getenv('MICROBLOGPUB_DEBUG', 'false')) - if debug_mode: - return True - - if parsed.hostname in ['localhost']: - return False - - try: - ip_address = socket.getaddrinfo(parsed.hostname, parsed.port or 80)[0][4][0] - except socket.gaierror: - logger.exception(f'failed to lookup url {url}') - return False - - if ipaddress.ip_address(ip_address).is_private: - logger.info(f'rejecting private URL {url}') - return False - - return True - - -def check_url(url: str) -> None: - if not is_url_valid(url): - raise InvalidURLError(f'"{url}" is invalid') - - return None diff --git a/little_boxes/utils.py b/little_boxes/utils.py deleted file mode 100644 index 2476182..0000000 --- a/little_boxes/utils.py +++ /dev/null @@ -1,60 +0,0 @@ -"""Contains some ActivityPub related utils.""" -from typing import Optional -from typing import Callable -from typing import Dict -from typing import List -from typing import Any - - -from .errors import RecursionLimitExceededError -from .errors import UnexpectedActivityTypeError - - -def parse_collection( - payload: Optional[Dict[str, Any]] = None, - url: Optional[str] = None, - level: int = 0, - fetcher: Optional[Callable[[str], Dict[str, Any]]] = None, -) -> List[Any]: - """Resolve/fetch a `Collection`/`OrderedCollection`.""" - if not fetcher: - raise Exception('must provide a fetcher') - if level > 3: - raise RecursionLimitExceededError('recursion limit exceeded') - - # Go through all the pages - out: List[Any] = [] - if url: - payload = fetcher(url) - if not payload: - raise ValueError('must at least prove a payload or an URL') - - if payload['type'] in ['Collection', 'OrderedCollection']: - if 'orderedItems' in payload: - return payload['orderedItems'] - if 'items' in payload: - return payload['items'] - if 'first' in payload: - if 'orderedItems' in payload['first']: - out.extend(payload['first']['orderedItems']) - if 'items' in payload['first']: - out.extend(payload['first']['items']) - n = payload['first'].get('next') - if n: - out.extend(parse_collection(url=n, level=level+1, fetcher=fetcher)) - return out - - while payload: - if payload['type'] in ['CollectionPage', 'OrderedCollectionPage']: - if 'orderedItems' in payload: - out.extend(payload['orderedItems']) - if 'items' in payload: - out.extend(payload['items']) - n = payload.get('next') - if n is None: - break - payload = fetcher(n) - else: - raise UnexpectedActivityTypeError('unexpected activity type {}'.format(payload['type'])) - - return out diff --git a/requirements.txt b/requirements.txt index 425405f..eb16141 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,21 +2,19 @@ libsass gunicorn piexif requests -markdown python-u2flib-server Flask Flask-WTF Celery pymongo -pyld timeago bleach -pycryptodome html2text feedgen itsdangerous bcrypt mf2py passlib -pyyaml git+https://github.com/erikriver/opengraph.git +git+https://github.com/tsileo/little-boxes.git +pyyaml diff --git a/test_little_boxes.py b/test_little_boxes.py deleted file mode 100644 index b297c8f..0000000 --- a/test_little_boxes.py +++ /dev/null @@ -1,26 +0,0 @@ -from little_boxes.activitypub import use_backend -from little_boxes.activitypub import BaseBackend -from little_boxes.activitypub import Outbox -from little_boxes.activitypub import Person -from little_boxes.activitypub import Follow - -def test_little_boxes_follow(): - back = BaseBackend() - use_backend(back) - - me = back.setup_actor('Thomas', 'tom') - - other = back.setup_actor('Thomas', 'tom2') - - outbox = Outbox(me) - f = Follow( - actor=me.id, - object=other.id, - ) - - outbox.post(f) - assert back.followers(other) == [me.id] - assert back.following(other) == [] - - assert back.followers(me) == [] - assert back.following(me) == [other.id] diff --git a/utils/key.py b/utils/key.py index f5a2455..18162a5 100644 --- a/utils/key.py +++ b/utils/key.py @@ -1,22 +1,23 @@ import os import binascii -from Crypto.PublicKey import RSA from typing import Callable -KEY_DIR = os.path.join( - os.path.dirname(os.path.abspath(__file__)), '..', 'config' -) +from little_boxes.key import Key + +KEY_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "config") def _new_key() -> str: - return binascii.hexlify(os.urandom(32)).decode('utf-8') + return binascii.hexlify(os.urandom(32)).decode("utf-8") + def get_secret_key(name: str, new_key: Callable[[], str] = _new_key) -> str: - key_path = os.path.join(KEY_DIR, f'{name}.key') + """Loads or generates a cryptographic key.""" + key_path = os.path.join(KEY_DIR, f"{name}.key") if not os.path.exists(key_path): k = new_key() - with open(key_path, 'w+') as f: + with open(key_path, "w+") as f: f.write(k) return k @@ -24,23 +25,19 @@ def get_secret_key(name: str, new_key: Callable[[], str] = _new_key) -> str: return f.read() -class Key(object): - DEFAULT_KEY_SIZE = 2048 - def __init__(self, user: str, domain: str, create: bool = True) -> None: - user = user.replace('.', '_') - domain = domain.replace('.', '_') - key_path = os.path.join(KEY_DIR, f'key_{user}_{domain}.pem') - if os.path.isfile(key_path): - with open(key_path) as f: - self.privkey_pem = f.read() - self.privkey = RSA.importKey(self.privkey_pem) - self.pubkey_pem = self.privkey.publickey().exportKey('PEM').decode('utf-8') - else: - if not create: - raise Exception('must init private key first') - k = RSA.generate(self.DEFAULT_KEY_SIZE) - self.privkey_pem = k.exportKey('PEM').decode('utf-8') - self.pubkey_pem = k.publickey().exportKey('PEM').decode('utf-8') - with open(key_path, 'w') as f: - f.write(self.privkey_pem) - self.privkey = k +def get_key(owner: str, user: str, domain: str) -> Key: + """"Loads or generates an RSA key.""" + k = Key(owner) + user = user.replace(".", "_") + domain = domain.replace(".", "_") + key_path = os.path.join(KEY_DIR, f"key_{user}_{domain}.pem") + if os.path.isfile(key_path): + with open(key_path) as f: + privkey_pem = f.read() + k.load(privkey_pem) + else: + k.new() + with open(key_path, "w") as f: + f.write(k.privkey_pem) + + return k