[WIP] More work on the ActivityPub module

This commit is contained in:
Thomas Sileo 2018-06-08 21:33:46 +02:00
parent d4bf73756f
commit 2abf0fffd2
5 changed files with 187 additions and 282 deletions

View file

@ -0,0 +1,12 @@
import logging
logger = logging.getLogger(__name__)
def strtobool(s: str) -> bool:
if s in ['y', 'yes', 'true', 'on', '1']:
return True
if s in ['n', 'no', 'false', 'off', '0']:
return False
raise ValueError(f'cannot convert {s} to bool')

View file

@ -9,7 +9,7 @@ from enum import Enum
from .errors import BadActivityError from .errors import BadActivityError
from .errors import UnexpectedActivityTypeError from .errors import UnexpectedActivityTypeError
from .errors import NotFromOutboxError from .errors import NotFromOutboxError
from . import utils from .utils import parse_collection
from .remote_object import OBJECT_FETCHER from .remote_object import OBJECT_FETCHER
from typing import List from typing import List
@ -109,6 +109,84 @@ def _get_actor_id(actor: ObjectOrIDType) -> str:
return actor return actor
class Actions(object):
def outbox_is_blocked(self, actor_id: str) -> bool:
return False
def inbox_get_by_iri(self, iri: str) -> 'BaseActivity':
pass
def inbox_new(self, activity: 'BaseActivity') -> None:
pass
def activity_url(self, obj_id: str) -> str:
# from the random hex ID
return 'TODO'
def outbox_new(self, activity: 'BaseActivity') -> None:
pass
def new_follower(self, actor: 'Person') -> None:
pass
def undo_new_follower(self, actor: 'Person') -> None:
pass
def new_following(self, actor: 'Person') -> None:
pass
def undo_new_following(self, actor: 'Person') -> None:
pass
def post_to_remote_inbox(self, payload: ObjectType, recp: str) -> None:
pass
def is_from_outbox(self, activity: 'BaseActivity') -> None:
pass
def inbox_like(self, activity: 'Like') -> None:
pass
def inbox_undo_like(self, activity: 'Like') -> None:
pass
def outbox_like(self, activity: 'Like') -> None:
pass
def outbox_undo_like(self, activity: 'Lke') -> None:
pass
def inbox_announce(self, activity: 'Announce') -> None:
pass
def inbox_undo_announce(self, activity: 'Announce') -> None:
pass
def outbox_announce(self, activity: 'Announce') -> None:
pass
def outbox_undo_announce(self, activity: 'Announce') -> None:
pass
def inbox_delete(self, activity: 'Delete') -> None:
pass
def outbox_delete(self, activity: 'Delete') -> None:
pass
def inbox_update(self, activity: 'Update') -> None:
pass
def outbox_update(self, activity: 'Update') -> None:
pass
def inbox_create(self, activity: 'Create') -> None:
pass
def outbox_create(self, activity: 'Create') -> None:
pass
class _ActivityMeta(type): class _ActivityMeta(type):
"""Metaclass for keeping track of subclass.""" """Metaclass for keeping track of subclass."""
def __new__(meta, name, bases, class_dict): def __new__(meta, name, bases, class_dict):
@ -119,16 +197,16 @@ class _ActivityMeta(type):
raise ValueError(f'class {name} has no ACTIVITY_TYPE') raise ValueError(f'class {name} has no ACTIVITY_TYPE')
# Register it # Register it
_REGISTER[cls.ACTIVITY_TYPE] = cls _ACTIVITY_CLS[cls.ACTIVITY_TYPE] = cls
return cls return cls
class _BaseActivity(object, metaclass=_ActivityMeta): class BaseActivity(object, metaclass=_ActivityMeta):
"""Base class for ActivityPub activities.""" """Base class for ActivityPub activities."""
ACTIVITY_TYPE: Optional[ActivityType] = None # the ActivityTypeEnum the class will represent ACTIVITY_TYPE: Optional[ActivityType] = None # the ActivityTypeEnum the class will represent
OBJECT_REQUIRED = False # Whether the object field is required or note OBJECT_REQUIRED = False # Whether the object field is required or note
ALLOWED_OBJECT_TYPES: List[ActivityType] = [] # ALLOWED_OBJECT_TYPES: List[ActivityType] = []
ACTOR_REQUIRED = True # Most of the object requires an actor, so this flag in on by default ACTOR_REQUIRED = True # Most of the object requires an actor, so this flag in on by default
def __init__(self, **kwargs) -> None: def __init__(self, **kwargs) -> None:
@ -139,7 +217,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
self._data: Dict[str, Any] = { self._data: Dict[str, Any] = {
'type': self.ACTIVITY_TYPE.value 'type': self.ACTIVITY_TYPE.value
} }
logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity: {kwargs}') logger.debug(f'initializing a {self.ACTIVITY_TYPE.value} activity: {kwargs!r}')
# The id may not be present for new activities # The id may not be present for new activities
if 'id' in kwargs: if 'id' in kwargs:
@ -172,7 +250,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
self._data['object'] = obj self._data['object'] = obj
if '@context' not in kwargs: if '@context' not in kwargs:
self._data['@context'] = CTX_AS self._data['@context'] = CTX_AS
else: else:
self._data['@context'] = kwargs.pop('@context') self._data['@context'] = kwargs.pop('@context')
@ -187,7 +265,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
else: else:
self._data['@context'].append({'Hashtag': 'as:Hashtag', 'sensitive': 'as:sensitive'}) self._data['@context'].append({'Hashtag': 'as:Hashtag', 'sensitive': 'as:sensitive'})
# FIXME(tsileo): keys required for all subclasses? # FIXME(tsileo): keys required for some subclasses?
allowed_keys = None allowed_keys = None
try: try:
allowed_keys = self._init(**kwargs) allowed_keys = self._init(**kwargs)
@ -257,7 +335,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
except Exception: except Exception:
raise BadActivityError(f'failed to validate actor {obj!r}') raise BadActivityError(f'failed to validate actor {obj!r}')
if not actor or not 'id' in actor: if not actor or 'id' not in actor:
raise BadActivityError(f'invalid actor {actor}') raise BadActivityError(f'invalid actor {actor}')
return actor['id'] return actor['id']
@ -274,7 +352,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
raise UnexpectedActivityTypeError(f'invalid object type {obj.get("type")!r}') raise UnexpectedActivityTypeError(f'invalid object type {obj.get("type")!r}')
p = parse_activity(obj) p = parse_activity(obj)
self.__obj: Optional[BaseActivity] = p self.__obj: Optional['BaseActivity'] = p
return p return p
def reset_object_cache(self) -> None: def reset_object_cache(self) -> None:
@ -326,13 +404,6 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
def _undo_inbox(self) -> None: def _undo_inbox(self) -> None:
raise NotImplementedError raise NotImplementedError
# FIXME(tsileo): delete these?
def _undo_should_purge_cache(self) -> bool:
raise NotImplementedError
def _should_purge_cache(self) -> bool:
raise NotImplementedError
def process_from_inbox(self) -> None: def process_from_inbox(self) -> None:
logger.debug(f'calling main process from inbox hook for {self}') logger.debug(f'calling main process from inbox hook for {self}')
actor = self.get_actor() actor = self.get_actor()
@ -340,12 +411,12 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
# Check for Block activity # Check for Block activity
# ABC # ABC
if self.outbox_is_blocked(actor.id): if self.outbox_is_blocked(actor.id):
# TODO(tsileo): raise ActorBlockedError # TODO(tsileo): raise ActorBlockedError?
logger.info(f'actor {actor!r} is blocked, dropping the received activity {self!r}') logger.info(f'actor {actor!r} is blocked, dropping the received activity {self!r}')
return return
# ABC # ABC
if self.inbox_get_by_id(self.id): if self.inbox_get_by_iri(self.id):
# The activity is already in the inbox # The activity is already in the inbox
logger.info(f'received duplicate activity {self}, dropping it') logger.info(f'received duplicate activity {self}, dropping it')
return return
@ -357,7 +428,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
logger.debug('pre process from inbox hook not implemented') logger.debug('pre process from inbox hook not implemented')
# ABC # ABC
self.inbox_create(self) self.inbox_new(self)
logger.info('activity {self!r} saved') logger.info('activity {self!r} saved')
try: try:
@ -371,7 +442,8 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
# Assign create a random ID # Assign create a random ID
obj_id = random_object_id() obj_id = random_object_id()
self.set_id(f'{ID}/outbox/{obj_id}', obj_id) # ABC
self.set_id(self.activity_url(obj_id), obj_id)
try: try:
self._pre_post_to_outbox() self._pre_post_to_outbox()
@ -380,11 +452,11 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
logger.debug('pre post to outbox hook not implemented') logger.debug('pre post to outbox hook not implemented')
# ABC # ABC
self.outbox_create(self) self.outbox_new(self)
recipients = self.recipients() recipients = self.recipients()
logger.info(f'recipients={recipients}') logger.info(f'recipients={recipients}')
activity = clean_activity(activity) activity = clean_activity(self.to_dict())
try: try:
self._post_to_outbox(obj_id, activity, recipients) self._post_to_outbox(obj_id, activity, recipients)
@ -399,9 +471,6 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
# ABC # ABC
self.post_to_remote_inbox(payload, recp) self.post_to_remote_inbox(payload, recp)
def _post_to_inbox(self, payload: str, to: str):
tasks.post_to_inbox.delay(payload, to)
def _recipients(self) -> List[str]: def _recipients(self) -> List[str]:
return [] return []
@ -423,7 +492,7 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
actor = recipient actor = recipient
else: else:
raw_actor = OBJECT_FETCHER.fetch(recipient) raw_actor = OBJECT_FETCHER.fetch(recipient)
if raw_actor['type'] == ActiivtyType.PERSON.name: if raw_actor['type'] == ActivityType.PERSON.name:
actor = Person(**raw_actor) actor = Person(**raw_actor)
if actor.endpoints: if actor.endpoints:
@ -437,14 +506,14 @@ class _BaseActivity(object, metaclass=_ActivityMeta):
# Is the activity a `Collection`/`OrderedCollection`? # Is the activity a `Collection`/`OrderedCollection`?
elif raw_actor['type'] in [ActivityType.COLLECTION.value, elif raw_actor['type'] in [ActivityType.COLLECTION.value,
ActivityType.ORDERED_COLLECTION.value]: ActivityType.ORDERED_COLLECTION.value]:
for item in parse_collection(raw_actor): for item in parse_collection(raw_actor):
if item in [ME, AS_PUBLIC]: if item in [actor_id, AS_PUBLIC]:
continue continue
try: try:
col_actor = Person(**OBJECT_FETCHER.fetch(item)) col_actor = Person(**OBJECT_FETCHER.fetch(item))
except NotAnActorError: except UnexpectedActivityTypeError:
pass logger.exception(f'failed to fetch actor {item!r}')
if col_actor.endpoints: if col_actor.endpoints:
shared_inbox = col_actor.endpoints.get('sharedInbox') shared_inbox = col_actor.endpoints.get('sharedInbox')
@ -503,7 +572,6 @@ class Follow(BaseActivity):
OBJECT_REQUIRED = True OBJECT_REQUIRED = True
ACTOR_REQUIRED = True ACTOR_REQUIRED = True
def _build_reply(self, reply_type: ActivityType) -> BaseActivity: def _build_reply(self, reply_type: ActivityType) -> BaseActivity:
if reply_type == ActivityType.ACCEPT: if reply_type == ActivityType.ACCEPT:
return Accept( return Accept(
@ -520,18 +588,18 @@ class Follow(BaseActivity):
accept = self.build_accept() accept = self.build_accept()
accept.post_to_outbox() accept.post_to_outbox()
remote_actor = self.get_actor().id remote_actor = self.get_actor()
# ABC # ABC
self.new_follower(remote_actor) self.new_follower(remote_actor)
def _undo_inbox(self) -> None: def _undo_inbox(self) -> None:
# ABC # ABC
self.undo_new_follower(self.get_actor().id) self.undo_new_follower(self.get_actor())
def _undo_outbox(self) -> None: def _undo_outbox(self) -> None:
# ABC # ABC
self.undo_new_following(self.get_object().id) self.undo_new_following(self.get_actor())
def build_accept(self) -> BaseActivity: def build_accept(self) -> BaseActivity:
return self._build_reply(ActivityType.ACCEPT) return self._build_reply(ActivityType.ACCEPT)
@ -550,7 +618,8 @@ class Accept(BaseActivity):
return [self.get_object().get_actor().id] return [self.get_object().get_actor().id]
def _pre_process_from_inbox(self) -> None: def _pre_process_from_inbox(self) -> None:
# FIXME(tsileo): ensure the actor match the object actor # FIXME(tsileo): ensure the actor match the object actor
pass
def _process_from_inbox(self) -> None: def _process_from_inbox(self) -> None:
# ABC # ABC
@ -591,9 +660,9 @@ class Undo(BaseActivity):
def _pre_post_to_outbox(self) -> None: def _pre_post_to_outbox(self) -> None:
"""Ensures an Undo activity references an activity owned by the instance.""" """Ensures an Undo activity references an activity owned by the instance."""
obj = self.get_object() # ABC
if not obj.id.startswith(ID): if not self.is_from_outbox(self):
raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') raise NotFromOutboxError(f'object {self!r} is not owned by this instance')
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
logger.debug('processing undo to outbox') logger.debug('processing undo to outbox')
@ -622,43 +691,20 @@ class Like(BaseActivity):
return [self.get_object().get_actor().id] return [self.get_object().get_actor().id]
def _process_from_inbox(self): def _process_from_inbox(self):
obj = self.get_object() # ABC
# Update the meta counter if the object is published by the server self.inbox_like(self)
# FIXME(tsileo): continue here
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': 1},
})
# XXX(tsileo): notification??
def _undo_inbox(self) -> None: def _undo_inbox(self) -> None:
obj = self.get_object() # ABC
# Update the meta counter if the object is published by the server self.inbox_undo_like(self)
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': -1},
})
def _undo_should_purge_cache(self) -> bool:
# If a like coutn was decremented, we need to purge the application cache
return self.get_object().id.startswith(BASE_URL)
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]): def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]):
obj = self.get_object() # ABC
# Unlikely, but an actor can like it's own post self.outbox_like(self)
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': 1},
})
# Keep track of the like we just performed
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': obj_id}})
def _undo_outbox(self) -> None: def _undo_outbox(self) -> None:
obj = self.get_object() # ABC
# Unlikely, but an actor can like it's own post self.outbox_undo_like(self)
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_like': -1},
})
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.liked': False}})
def build_undo(self) -> BaseActivity: def build_undo(self) -> BaseActivity:
return Undo(object=self.to_dict(embed=True, embed_object_id_only=True)) return Undo(object=self.to_dict(embed=True, embed_object_id_only=True))
@ -667,6 +713,8 @@ class Like(BaseActivity):
class Announce(BaseActivity): class Announce(BaseActivity):
ACTIVITY_TYPE = ActivityType.ANNOUNCE ACTIVITY_TYPE = ActivityType.ANNOUNCE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] ALLOWED_OBJECT_TYPES = [ActivityType.NOTE]
OBJECT_REQUIRED = True
ACTOR_REQUIRED = True
def _recipients(self) -> List[str]: def _recipients(self) -> List[str]:
recipients = [] recipients = []
@ -684,49 +732,21 @@ class Announce(BaseActivity):
f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message' f'received an Annouce referencing an OStatus notice ({self._data["object"]}), dropping the message'
) )
return return
# Save/cache the object, and make it part of the stream so we can fetch it
if isinstance(self._data['object'], str):
raw_obj = OBJECT_SERVICE.get(
self._data['object'],
reload_cache=True,
part_of_stream=True,
announce_published=self._data['published'],
)
obj = parse_activity(raw_obj)
else:
obj = self.get_object()
DB.outbox.update_one({'activity.object.id': obj.id}, { # ABC
'$inc': {'meta.count_boost': 1}, self.inbox_announce(self)
})
def _undo_inbox(self) -> None: def _undo_inbox(self) -> None:
obj = self.get_object() # ABC
# Update the meta counter if the object is published by the server self.inbox_undo_annnounce(self)
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$inc': {'meta.count_boost': -1},
})
def _undo_should_purge_cache(self) -> bool:
# If a like coutn was decremented, we need to purge the application cache
return self.get_object().id.startswith(BASE_URL)
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
if isinstance(self._data['object'], str): # ABC
# Put the object in the cache self.outbox_announce(self)
OBJECT_SERVICE.get(
self._data['object'],
reload_cache=True,
part_of_stream=True,
announce_published=self._data['published'],
)
obj = self.get_object()
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': obj_id}})
def _undo_outbox(self) -> None: def _undo_outbox(self) -> None:
obj = self.get_object() # ABC
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'meta.boosted': False}}) self.outbox_undo_announce(self)
def build_undo(self) -> BaseActivity: def build_undo(self) -> BaseActivity:
return Undo(object=self.to_dict(embed=True)) return Undo(object=self.to_dict(embed=True))
@ -738,9 +758,10 @@ class Delete(BaseActivity):
OBJECT_REQUIRED = True OBJECT_REQUIRED = True
def _get_actual_object(self) -> BaseActivity: def _get_actual_object(self) -> BaseActivity:
# FIXME(tsileo): overrides get_object instead?
obj = self.get_object() obj = self.get_object()
if obj.type_enum == ActivityType.TOMBSTONE: if obj.type_enum == ActivityType.TOMBSTONE:
obj = parse_activity(OBJECT_SERVICE.get(obj.id)) obj = parse_activity(OBJECT_FETCHER.fetch(obj.id))
return obj return obj
def _recipients(self) -> List[str]: def _recipients(self) -> List[str]:
@ -755,27 +776,27 @@ class Delete(BaseActivity):
raise BadActivityError(f'{actor!r} cannot delete {obj!r}') raise BadActivityError(f'{actor!r} cannot delete {obj!r}')
def _process_from_inbox(self) -> None: def _process_from_inbox(self) -> None:
DB.inbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}}) # ABC
obj = self._get_actual_object() self.inbox_delete(self)
if obj.type_enum == ActivityType.NOTE: # FIXME(tsileo): handle the delete_threads here?
obj._delete_from_threads()
# TODO(tsileo): also purge the cache if it's a reply of a published activity
def _pre_post_to_outbox(self) -> None: def _pre_post_to_outbox(self) -> None:
"""Ensures the Delete activity references a activity from the outbox (i.e. owned by the instance).""" """Ensures the Delete activity references a activity from the outbox (i.e. owned by the instance)."""
obj = self._get_actual_object() obj = self._get_actual_object()
if not obj.id.startswith(ID): # ABC
if not self.is_from_outbox(self):
raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance')
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
DB.outbox.update_one({'activity.object.id': self.get_object().id}, {'$set': {'meta.deleted': True}}) # ABC
self.outbox_delete(self)
class Update(BaseActivity): class Update(BaseActivity):
ACTIVITY_TYPE = ActivityType.UPDATE ACTIVITY_TYPE = ActivityType.UPDATE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.PERSON] ALLOWED_OBJECT_TYPES = [ActivityType.NOTE, ActivityType.PERSON]
OBJECT_REQUIRED = True OBJECT_REQUIRED = True
ACTOR_REQUIRED = True
def _pre_process_from_inbox(self) -> None: def _pre_process_from_inbox(self) -> None:
"""Ensures an Update activity comes from the same actor as the updated activity.""" """Ensures an Update activity comes from the same actor as the updated activity."""
@ -785,53 +806,29 @@ class Update(BaseActivity):
raise BadActivityError(f'{actor!r} cannot update {obj!r}') raise BadActivityError(f'{actor!r} cannot update {obj!r}')
def _process_from_inbox(self): def _process_from_inbox(self):
obj = self.get_object() # ABC
if obj.type_enum == ActivityType.NOTE: self.inbox_update(self)
DB.inbox.update_one({'activity.object.id': obj.id}, {'$set': {'activity.object': obj.to_dict()}})
return
# If the object is a Person, it means the profile was updated, we just refresh our local cache
ACTOR_SERVICE.get(obj.id, reload_cache=True)
# TODO(tsileo): implements _should_purge_cache if it's a reply of a published activity (i.e. in the outbox)
def _pre_post_to_outbox(self) -> None: def _pre_post_to_outbox(self) -> None:
obj = self.get_object() # ABC
if not obj.id.startswith(ID): if not self.is_form_outbox(self):
raise NotFromOutboxError(f'object {obj["id"]} is not owned by this instance') raise NotFromOutboxError(f'object {self!r} is not owned by this instance')
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
obj = self._data['object'] # ABC
self.outbox_update(self)
update_prefix = 'activity.object.'
update: Dict[str, Any] = {'$set': dict(), '$unset': dict()}
update['$set'][f'{update_prefix}updated'] = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'
for k, v in obj.items():
if k in ['id', 'type']:
continue
if v is None:
update['$unset'][f'{update_prefix}{k}'] = ''
else:
update['$set'][f'{update_prefix}{k}'] = v
if len(update['$unset']) == 0:
del(update['$unset'])
print(f'updating note from outbox {obj!r} {update}')
logger.info(f'updating note from outbox {obj!r} {update}')
DB.outbox.update_one({'activity.object.id': obj['id']}, update)
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
# (create a new Update with the result of the update, and send it without saving it?)
class Create(BaseActivity): class Create(BaseActivity):
ACTIVITY_TYPE = ActivityType.CREATE ACTIVITY_TYPE = ActivityType.CREATE
ALLOWED_OBJECT_TYPES = [ActivityType.NOTE] ALLOWED_OBJECT_TYPES = [ActivityType.NOTE]
OBJECT_REQUIRED = True OBJECT_REQUIRED = True
ACTOR_REQUIRED = True
def _set_id(self, uri: str, obj_id: str) -> None: def _set_id(self, uri: str, obj_id: str) -> None:
self._data['object']['id'] = uri + '/activity' self._data['object']['id'] = uri + '/activity'
self._data['object']['url'] = ID + '/' + self.get_object().type.lower() + '/' + obj_id # ABC
self._data['object']['url'] = self.note_url(self)
self.reset_object_cache() self.reset_object_cache()
def _init(self, **kwargs): def _init(self, **kwargs):
@ -857,83 +854,25 @@ class Create(BaseActivity):
return recipients return recipients
def _update_threads(self) -> None:
logger.debug('_update_threads hook')
obj = self.get_object()
# TODO(tsileo): re-enable me
# tasks.fetch_og.delay('INBOX', self.id)
threads = []
reply = obj.get_local_reply()
print(f'initial_reply={reply}')
print(f'{obj}')
logger.debug(f'initial_reply={reply}')
reply_id = None
direct_reply = 1
while reply is not None:
if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': 1,
'meta.count_direct_reply': direct_reply,
},
'$addToSet': {'meta.thread_children': obj.id},
}):
DB.outbox.update_one({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': 1,
'meta.count_direct_reply': direct_reply,
},
'$addToSet': {'meta.thread_children': obj.id},
})
direct_reply = 0
reply_id = reply.id
reply = reply.get_local_reply()
logger.debug(f'next_reply={reply}')
threads.append(reply_id)
# FIXME(tsileo): obj.id is None!!
print(f'reply_id={reply_id} {obj.id} {obj._data} {self.id}')
if reply_id:
if not DB.inbox.find_one_and_update({'activity.object.id': obj.id}, {
'$set': {
'meta.thread_parents': threads,
'meta.thread_root_parent': reply_id,
},
}):
DB.outbox.update_one({'activity.object.id': obj.id}, {
'$set': {
'meta.thread_parents': threads,
'meta.thread_root_parent': reply_id,
},
})
logger.debug('_update_threads done')
def _process_from_inbox(self) -> None: def _process_from_inbox(self) -> None:
self._update_threads() # ABC
self.inbox_create(self)
def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None: def _post_to_outbox(self, obj_id: str, activity: ObjectType, recipients: List[str]) -> None:
self._update_threads() # ABC
self.outbox_create(self)
def _should_purge_cache(self) -> bool:
# TODO(tsileo): handle reply of a reply...
obj = self.get_object()
in_reply_to = obj.inReplyTo
if in_reply_to:
local_activity = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if local_activity:
return True
return False
class Tombstone(BaseActivity): class Tombstone(BaseActivity):
ACTIVITY_TYPE = ActivityType.TOMBSTONE ACTIVITY_TYPE = ActivityType.TOMBSTONE
ACTOR_REQUIRED = False
OBJECT_REQUIRED = False
class Note(BaseActivity): class Note(BaseActivity):
ACTIVITY_TYPE = ActivityType.NOTE ACTIVITY_TYPE = ActivityType.NOTE
ACTOR_REQUIRED = True
OBJECT_REQURIED = False
def _init(self, **kwargs): def _init(self, **kwargs):
print(self._data) print(self._data)
@ -947,11 +886,12 @@ class Note(BaseActivity):
# TODO(tsileo): audience support? # TODO(tsileo): audience support?
recipients: List[str] = [] recipients: List[str] = []
# FIXME(tsileo): re-add support for the PUBLIC_INSTANCES
# If the note is public, we publish it to the defined "public instances" # If the note is public, we publish it to the defined "public instances"
if AS_PUBLIC in self._data.get('to', []): # if AS_PUBLIC in self._data.get('to', []):
recipients.extend(PUBLIC_INSTANCES) # recipients.extend(PUBLIC_INSTANCES)
print('publishing to public instances') # print('publishing to public instances')
print(recipients) # print(recipients)
for field in ['to', 'cc', 'bto', 'bcc']: for field in ['to', 'cc', 'bto', 'bcc']:
if field in self._data: if field in self._data:
@ -959,59 +899,11 @@ class Note(BaseActivity):
return recipients return recipients
def _delete_from_threads(self) -> None:
logger.debug('_delete_from_threads hook')
reply = self.get_local_reply()
logger.debug(f'initial_reply={reply}')
direct_reply = -1
while reply is not None:
if not DB.inbox.find_one_and_update({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': -1,
'meta.count_direct_reply': direct_reply,
},
'$pull': {'meta.thread_children': self.id},
}):
DB.outbox.update_one({'activity.object.id': reply.id}, {
'$inc': {
'meta.count_reply': 1,
'meta.count_direct_reply': direct_reply,
},
'$pull': {'meta.thread_children': self.id},
})
direct_reply = 0
reply = reply.get_local_reply()
logger.debug(f'next_reply={reply}')
logger.debug('_delete_from_threads done')
return None
def get_local_reply(self) -> Optional[BaseActivity]:
"Find the note reply if any."""
in_reply_to = self.inReplyTo
if not in_reply_to:
# This is the root comment
return None
inbox_parent = DB.inbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if inbox_parent:
return parse_activity(inbox_parent['activity']['object'])
outbox_parent = DB.outbox.find_one({'activity.type': 'Create', 'activity.object.id': in_reply_to})
if outbox_parent:
return parse_activity(outbox_parent['activity']['object'])
# The parent is no stored on this instance
return None
def build_create(self) -> BaseActivity: def build_create(self) -> BaseActivity:
"""Wraps an activity in a Create activity.""" """Wraps an activity in a Create activity."""
create_payload = { create_payload = {
'object': self.to_dict(embed=True), 'object': self.to_dict(embed=True),
'actor': self.attributedTo or ME, 'actor': self.attributedTo,
} }
for field in ['published', 'to', 'bto', 'cc', 'bcc', 'audience']: for field in ['published', 'to', 'bto', 'cc', 'bcc', 'audience']:
if field in self._data: if field in self._data:
@ -1026,7 +918,7 @@ class Note(BaseActivity):
return Announce( return Announce(
object=self.id, object=self.id,
to=[AS_PUBLIC], to=[AS_PUBLIC],
cc=[ID+'/followers', self.attributedTo], cc=[self.follower_collection_id(self.get_actor()), self.attributedTo], # ABC
published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z', published=datetime.utcnow().replace(microsecond=0).isoformat() + 'Z',
) )

View file

@ -8,7 +8,11 @@ class Error(Exception):
"""HTTP-friendly base error, with a status code, a message and an optional payload.""" """HTTP-friendly base error, with a status code, a message and an optional payload."""
status_code = 400 status_code = 400
def __init__(self, message: str, status_code: Optional[int] = None, payload: Optional[Dict[str, Any]] = None) -> None: def __init__(
self, message: str,
status_code: Optional[int] = None,
payload: Optional[Dict[str, Any]] = None,
) -> None:
Exception.__init__(self) Exception.__init__(self)
self.message = message self.message = message
if status_code is not None: if status_code is not None:
@ -21,7 +25,9 @@ class Error(Exception):
return rv return rv
def __repr__(self) -> str: def __repr__(self) -> str:
return f'{self.__class__.__qualname__}({self.message!r}, payload={self.payload!r}, status_code={self.status_code})' return (
f'{self.__class__.__qualname__}({self.message!r}, payload={self.payload!r}, status_code={self.status_code})'
)
class ActorBlockedError(Error): class ActorBlockedError(Error):

View file

@ -2,12 +2,9 @@ import logging
from typing import Any from typing import Any
import requests import requests
from urllib.parse import urlparse
from Crypto.PublicKey import RSA
from .urlutils import check_url from .urlutils import check_url
from .errors import ActivityNotFoundError from .errors import ActivityNotFoundError
from .errors import UnexpectedActivityTypeError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -21,19 +18,22 @@ class DefaultRemoteObjectFetcher(object):
def fetch(self, iri): def fetch(self, iri):
check_url(iri) check_url(iri)
resp = requests.get(actor_url, headers={ resp = requests.get(iri, headers={
'Accept': 'application/activity+json', 'Accept': 'application/activity+json',
'User-Agent': self._user_agent, 'User-Agent': self._user_agent,
}) })
if resp.status_code == 404: if resp.status_code == 404:
raise ActivityNotFoundError(f'{actor_url} cannot be fetched, 404 not found error') raise ActivityNotFoundError(f'{iri} cannot be fetched, 404 not found error')
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
OBJECT_FETCHER = DefaultRemoteObjectFetcher() OBJECT_FETCHER = DefaultRemoteObjectFetcher()
def set_object_fetcher(object_fetcher: Any): def set_object_fetcher(object_fetcher: Any):
global OBJECT_FETCHER
OBJECT_FETCHER = object_fetcher OBJECT_FETCHER = object_fetcher

View file

@ -4,7 +4,6 @@ from typing import Dict
from typing import List from typing import List
from typing import Any from typing import Any
import requests
from .errors import RecursionLimitExceededError from .errors import RecursionLimitExceededError
from .errors import UnexpectedActivityTypeError from .errors import UnexpectedActivityTypeError
@ -21,10 +20,6 @@ def parse_collection(
raise RecursionLimitExceededError('recursion limit exceeded') raise RecursionLimitExceededError('recursion limit exceeded')
# Go through all the pages # Go through all the pages
headers = {'Accept': 'application/activity+json'}
if user_agent:
headers['User-Agent'] = user_agent
out: List[Any] = [] out: List[Any] = []
if url: if url:
payload = OBJECT_FETCHER.fetch(url) payload = OBJECT_FETCHER.fetch(url)