diff --git a/activitypub.py b/activitypub.py index 0ff1e7a..8dd4399 100644 --- a/activitypub.py +++ b/activitypub.py @@ -21,11 +21,10 @@ from little_boxes import activitypub as ap from little_boxes.backend import Backend from little_boxes.collection import parse_collection as ap_parse_collection from little_boxes.errors import Error +from little_boxes.errors import ActivityNotFoundError logger = logging.getLogger(__name__) -MY_PERSON = ap.Person(**ME) - def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: """Helper for removing MongoDB's `_id` field.""" @@ -46,7 +45,7 @@ def ensure_it_is_me(f): """Method decorator used to track the events fired during tests.""" def wrapper(*args, **kwargs): - if args[1].id != MY_PERSON.id: + if args[1].id != ME["id"]: raise Error("unexpected actor") return f(*args, **kwargs) @@ -87,7 +86,22 @@ class MicroblogPubBackend(Backend): ) def fetch_iri(self, iri: str) -> ap.ObjectType: - # FIXME(tsileo): implements caching + if iri == ME["id"]: + return ME + + # Check if the activity is owned by this server + if iri.startswith(BASE_URL): + data = DB.outbox.find_one({"remote_id": iri}) + if not data: + raise ActivityNotFoundError(f"{iri} not found on this server") + return data["activity"] + + # Check if the activity is stored in the inbox + data = DB.inbox.find_one({"remote_id": iri}) + if data: + return data["activity"] + + # Fetch the URL via HTTP return super().fetch_iri(iri) @ensure_it_is_me @@ -149,7 +163,7 @@ class MicroblogPubBackend(Backend): ) @ensure_it_is_me - def outobx_like(self, as_actor: ap.Person, like: ap.Like) -> None: + def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None: obj = like.get_object() # Unlikely, but an actor can like it's own post DB.outbox.update_one( @@ -273,6 +287,12 @@ class MicroblogPubBackend(Backend): # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients # (create a new Update with the result of the update, and send it without saving it?) + def outbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: + pass + + def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: + pass + def gen_feed(): fg = FeedGenerator() diff --git a/app.py b/app.py index 33374a2..0ab5e29 100644 --- a/app.py +++ b/app.py @@ -36,9 +36,7 @@ from werkzeug.utils import secure_filename import activitypub import config -from activitypub import MY_PERSON from activitypub import embed_collection -from config import ACTOR_SERVICE from config import ADMIN_API_KEY from config import BASE_URL from config import DB @@ -49,7 +47,6 @@ from config import ID from config import JWT from config import KEY from config import ME -from config import OBJECT_SERVICE from config import PASS from config import USERNAME from config import VERSION @@ -63,10 +60,18 @@ from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import Error from little_boxes.errors import NotFromOutboxError from little_boxes.httpsig import HTTPSigAuth -from little_boxes.httpsig import verify_request +# from little_boxes.httpsig import verify_request from little_boxes.webfinger import get_actor_url from little_boxes.webfinger import get_remote_follow_template from utils.key import get_secret_key +from utils.object_service import ObjectService + +OBJECT_SERVICE = ACTOR_SERVICE = ObjectService() + +back = activitypub.MicroblogPubBackend() +ap.use_backend(back) + +MY_PERSON = ap.Person(**ME) app = Flask(__name__) app.secret_key = get_secret_key("flask") diff --git a/config.py b/config.py index 8478186..44bca9c 100644 --- a/config.py +++ b/config.py @@ -84,9 +84,9 @@ JWT = JSONWebSignatureSerializer(JWT_SECRET) def _admin_jwt_token() -> str: - return JWT.dumps({"me": "ADMIN", "ts": datetime.now().timestamp()}).decode( + return JWT.dumps({"me": "ADMIN", "ts": datetime.now().timestamp()}).decode( # type: ignore "utf-8" - ) # type: ignore + ) ADMIN_API_KEY = get_secret_key("admin_api_key", _admin_jwt_token) diff --git a/utils/actor_service.py b/utils/actor_service.py deleted file mode 100644 index bb97131..0000000 --- a/utils/actor_service.py +++ /dev/null @@ -1,94 +0,0 @@ -import logging -from urllib.parse import urlparse - -import requests -from Crypto.PublicKey import RSA - -from .errors import ActivityNotFoundError -from .urlutils import check_url - -logger = logging.getLogger(__name__) - - -class NotAnActorError(Exception): - def __init__(self, activity): - self.activity = activity - - -class ActorService(object): - def __init__(self, user_agent, col, actor_id, actor_data, instances): - logger.debug(f"Initializing ActorService user_agent={user_agent}") - self._user_agent = user_agent - self._col = col - self._in_mem = {actor_id: actor_data} - self._instances = instances - self._known_instances = set() - - def _fetch(self, actor_url): - logger.debug(f"fetching remote object {actor_url}") - - check_url(actor_url) - - resp = requests.get( - actor_url, - headers={ - "Accept": "application/activity+json", - "User-Agent": self._user_agent, - }, - ) - if resp.status_code == 404: - raise ActivityNotFoundError( - f"{actor_url} cannot be fetched, 404 not found error" - ) - - resp.raise_for_status() - return resp.json() - - def get(self, actor_url, reload_cache=False): - logger.info(f"get actor {actor_url} (reload_cache={reload_cache})") - - if actor_url in self._in_mem: - return self._in_mem[actor_url] - - instance = urlparse(actor_url)._replace(path="", query="", fragment="").geturl() - if instance not in self._known_instances: - self._known_instances.add(instance) - if not self._instances.find_one({"instance": instance}): - self._instances.insert( - {"instance": instance, "first_object": actor_url} - ) - - if reload_cache: - actor = self._fetch(actor_url) - self._in_mem[actor_url] = actor - self._col.update( - {"actor_id": actor_url}, - {"$set": {"cached_response": actor}}, - upsert=True, - ) - return actor - - cached_actor = self._col.find_one({"actor_id": actor_url}) - if cached_actor: - return cached_actor["cached_response"] - - actor = self._fetch(actor_url) - if not "type" in actor: - raise NotAnActorError(None) - if actor["type"] != "Person": - raise NotAnActorError(actor) - - self._col.update( - {"actor_id": actor_url}, {"$set": {"cached_response": actor}}, upsert=True - ) - self._in_mem[actor_url] = actor - return actor - - def get_public_key(self, actor_url, reload_cache=False): - profile = self.get(actor_url, reload_cache=reload_cache) - pub = profile["publicKey"] - return pub["id"], RSA.importKey(pub["publicKeyPem"]) - - def get_inbox_url(self, actor_url, reload_cache=False): - profile = self.get(actor_url, reload_cache=reload_cache) - return profile.get("inbox") diff --git a/utils/object_service.py b/utils/object_service.py index 8ce8d11..e46f9b1 100644 --- a/utils/object_service.py +++ b/utils/object_service.py @@ -1,115 +1,21 @@ -from urllib.parse import urlparse +import logging -import requests +from little_boxes.activitypub import get_backend -from .errors import ActivityNotFoundError -from .urlutils import check_url +logger = logging.getLogger(__name__) class ObjectService(object): - def __init__(self, user_agent, col, inbox, outbox, instances): - self._user_agent = user_agent - self._col = col - self._inbox = inbox - self._outbox = outbox - self._instances = instances - self._known_instances = set() + def __init__(self): + logger.debug("Initializing ObjectService") + self._cache = {} - def _fetch_remote(self, object_id): - print(f"fetch remote {object_id}") - check_url(object_id) - resp = requests.get( - object_id, - headers={ - "Accept": "application/activity+json", - "User-Agent": self._user_agent, - }, - ) - if resp.status_code == 404: - raise ActivityNotFoundError( - f"{object_id} cannot be fetched, 404 error not found" - ) + def get(self, iri, reload_cache=False): + logger.info(f"get actor {iri} (reload_cache={reload_cache})") - resp.raise_for_status() - return resp.json() - - def _fetch(self, object_id): - instance = urlparse(object_id)._replace(path="", query="", fragment="").geturl() - if instance not in self._known_instances: - self._known_instances.add(instance) - if not self._instances.find_one({"instance": instance}): - self._instances.insert( - {"instance": instance, "first_object": object_id} - ) - - obj = self._inbox.find_one( - { - "$or": [ - {"remote_id": object_id}, - {"type": "Create", "activity.object.id": object_id}, - ] - } - ) - if obj: - if obj["remote_id"] == object_id: - return obj["activity"] - return obj["activity"]["object"] - - obj = self._outbox.find_one( - { - "$or": [ - {"remote_id": object_id}, - {"type": "Create", "activity.object.id": object_id}, - ] - } - ) - if obj: - if obj["remote_id"] == object_id: - return obj["activity"] - return obj["activity"]["object"] - - return self._fetch_remote(object_id) - - def get( - self, - object_id, - reload_cache=False, - part_of_stream=False, - announce_published=None, - ): - if reload_cache: - obj = self._fetch(object_id) - self._col.update( - {"object_id": object_id}, - { - "$set": { - "cached_object": obj, - "meta.part_of_stream": part_of_stream, - "meta.announce_published": announce_published, - } - }, - upsert=True, - ) - return obj - - cached_object = self._col.find_one({"object_id": object_id}) - if cached_object: - print(f"ObjectService: {cached_object}") - return cached_object["cached_object"] - - obj = self._fetch(object_id) - - self._col.update( - {"object_id": object_id}, - { - "$set": { - "cached_object": obj, - "meta.part_of_stream": part_of_stream, - "meta.announce_published": announce_published, - } - }, - upsert=True, - ) - # print(f'ObjectService: {obj}') + if not reload_cache and iri in self._cache: + return self._cache[iri] + obj = get_backend().fetch_iri(iri) + self._cache[iri] = obj return obj