Switched little_boxes

This commit is contained in:
Thomas Sileo 2018-06-17 20:51:23 +02:00
parent 6220064951
commit 7f65fdfc90
5 changed files with 48 additions and 211 deletions

View file

@ -21,11 +21,10 @@ from little_boxes import activitypub as ap
from little_boxes.backend import Backend from little_boxes.backend import Backend
from little_boxes.collection import parse_collection as ap_parse_collection from little_boxes.collection import parse_collection as ap_parse_collection
from little_boxes.errors import Error from little_boxes.errors import Error
from little_boxes.errors import ActivityNotFoundError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MY_PERSON = ap.Person(**ME)
def _remove_id(doc: ap.ObjectType) -> ap.ObjectType: def _remove_id(doc: ap.ObjectType) -> ap.ObjectType:
"""Helper for removing MongoDB's `_id` field.""" """Helper for removing MongoDB's `_id` field."""
@ -46,7 +45,7 @@ def ensure_it_is_me(f):
"""Method decorator used to track the events fired during tests.""" """Method decorator used to track the events fired during tests."""
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
if args[1].id != MY_PERSON.id: if args[1].id != ME["id"]:
raise Error("unexpected actor") raise Error("unexpected actor")
return f(*args, **kwargs) return f(*args, **kwargs)
@ -87,7 +86,22 @@ class MicroblogPubBackend(Backend):
) )
def fetch_iri(self, iri: str) -> ap.ObjectType: def fetch_iri(self, iri: str) -> ap.ObjectType:
# FIXME(tsileo): implements caching if iri == ME["id"]:
return ME
# Check if the activity is owned by this server
if iri.startswith(BASE_URL):
data = DB.outbox.find_one({"remote_id": iri})
if not data:
raise ActivityNotFoundError(f"{iri} not found on this server")
return data["activity"]
# Check if the activity is stored in the inbox
data = DB.inbox.find_one({"remote_id": iri})
if data:
return data["activity"]
# Fetch the URL via HTTP
return super().fetch_iri(iri) return super().fetch_iri(iri)
@ensure_it_is_me @ensure_it_is_me
@ -149,7 +163,7 @@ class MicroblogPubBackend(Backend):
) )
@ensure_it_is_me @ensure_it_is_me
def outobx_like(self, as_actor: ap.Person, like: ap.Like) -> None: def outbox_like(self, as_actor: ap.Person, like: ap.Like) -> None:
obj = like.get_object() obj = like.get_object()
# Unlikely, but an actor can like it's own post # Unlikely, but an actor can like it's own post
DB.outbox.update_one( DB.outbox.update_one(
@ -273,6 +287,12 @@ class MicroblogPubBackend(Backend):
# FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients # FIXME(tsileo): should send an Update (but not a partial one, to all the note's recipients
# (create a new Update with the result of the update, and send it without saving it?) # (create a new Update with the result of the update, and send it without saving it?)
def outbox_create(self, as_actor: ap.Person, create: ap.Create) -> None:
pass
def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None:
pass
def gen_feed(): def gen_feed():
fg = FeedGenerator() fg = FeedGenerator()

13
app.py
View file

@ -36,9 +36,7 @@ from werkzeug.utils import secure_filename
import activitypub import activitypub
import config import config
from activitypub import MY_PERSON
from activitypub import embed_collection from activitypub import embed_collection
from config import ACTOR_SERVICE
from config import ADMIN_API_KEY from config import ADMIN_API_KEY
from config import BASE_URL from config import BASE_URL
from config import DB from config import DB
@ -49,7 +47,6 @@ from config import ID
from config import JWT from config import JWT
from config import KEY from config import KEY
from config import ME from config import ME
from config import OBJECT_SERVICE
from config import PASS from config import PASS
from config import USERNAME from config import USERNAME
from config import VERSION from config import VERSION
@ -63,10 +60,18 @@ from little_boxes.errors import ActivityNotFoundError
from little_boxes.errors import Error from little_boxes.errors import Error
from little_boxes.errors import NotFromOutboxError from little_boxes.errors import NotFromOutboxError
from little_boxes.httpsig import HTTPSigAuth from little_boxes.httpsig import HTTPSigAuth
from little_boxes.httpsig import verify_request # from little_boxes.httpsig import verify_request
from little_boxes.webfinger import get_actor_url from little_boxes.webfinger import get_actor_url
from little_boxes.webfinger import get_remote_follow_template from little_boxes.webfinger import get_remote_follow_template
from utils.key import get_secret_key from utils.key import get_secret_key
from utils.object_service import ObjectService
OBJECT_SERVICE = ACTOR_SERVICE = ObjectService()
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
MY_PERSON = ap.Person(**ME)
app = Flask(__name__) app = Flask(__name__)
app.secret_key = get_secret_key("flask") app.secret_key = get_secret_key("flask")

View file

@ -84,9 +84,9 @@ JWT = JSONWebSignatureSerializer(JWT_SECRET)
def _admin_jwt_token() -> str: def _admin_jwt_token() -> str:
return JWT.dumps({"me": "ADMIN", "ts": datetime.now().timestamp()}).decode( return JWT.dumps({"me": "ADMIN", "ts": datetime.now().timestamp()}).decode( # type: ignore
"utf-8" "utf-8"
) # type: ignore )
ADMIN_API_KEY = get_secret_key("admin_api_key", _admin_jwt_token) ADMIN_API_KEY = get_secret_key("admin_api_key", _admin_jwt_token)

View file

@ -1,94 +0,0 @@
import logging
from urllib.parse import urlparse
import requests
from Crypto.PublicKey import RSA
from .errors import ActivityNotFoundError
from .urlutils import check_url
logger = logging.getLogger(__name__)
class NotAnActorError(Exception):
def __init__(self, activity):
self.activity = activity
class ActorService(object):
def __init__(self, user_agent, col, actor_id, actor_data, instances):
logger.debug(f"Initializing ActorService user_agent={user_agent}")
self._user_agent = user_agent
self._col = col
self._in_mem = {actor_id: actor_data}
self._instances = instances
self._known_instances = set()
def _fetch(self, actor_url):
logger.debug(f"fetching remote object {actor_url}")
check_url(actor_url)
resp = requests.get(
actor_url,
headers={
"Accept": "application/activity+json",
"User-Agent": self._user_agent,
},
)
if resp.status_code == 404:
raise ActivityNotFoundError(
f"{actor_url} cannot be fetched, 404 not found error"
)
resp.raise_for_status()
return resp.json()
def get(self, actor_url, reload_cache=False):
logger.info(f"get actor {actor_url} (reload_cache={reload_cache})")
if actor_url in self._in_mem:
return self._in_mem[actor_url]
instance = urlparse(actor_url)._replace(path="", query="", fragment="").geturl()
if instance not in self._known_instances:
self._known_instances.add(instance)
if not self._instances.find_one({"instance": instance}):
self._instances.insert(
{"instance": instance, "first_object": actor_url}
)
if reload_cache:
actor = self._fetch(actor_url)
self._in_mem[actor_url] = actor
self._col.update(
{"actor_id": actor_url},
{"$set": {"cached_response": actor}},
upsert=True,
)
return actor
cached_actor = self._col.find_one({"actor_id": actor_url})
if cached_actor:
return cached_actor["cached_response"]
actor = self._fetch(actor_url)
if not "type" in actor:
raise NotAnActorError(None)
if actor["type"] != "Person":
raise NotAnActorError(actor)
self._col.update(
{"actor_id": actor_url}, {"$set": {"cached_response": actor}}, upsert=True
)
self._in_mem[actor_url] = actor
return actor
def get_public_key(self, actor_url, reload_cache=False):
profile = self.get(actor_url, reload_cache=reload_cache)
pub = profile["publicKey"]
return pub["id"], RSA.importKey(pub["publicKeyPem"])
def get_inbox_url(self, actor_url, reload_cache=False):
profile = self.get(actor_url, reload_cache=reload_cache)
return profile.get("inbox")

View file

@ -1,115 +1,21 @@
from urllib.parse import urlparse import logging
import requests from little_boxes.activitypub import get_backend
from .errors import ActivityNotFoundError logger = logging.getLogger(__name__)
from .urlutils import check_url
class ObjectService(object): class ObjectService(object):
def __init__(self, user_agent, col, inbox, outbox, instances): def __init__(self):
self._user_agent = user_agent logger.debug("Initializing ObjectService")
self._col = col self._cache = {}
self._inbox = inbox
self._outbox = outbox
self._instances = instances
self._known_instances = set()
def _fetch_remote(self, object_id): def get(self, iri, reload_cache=False):
print(f"fetch remote {object_id}") logger.info(f"get actor {iri} (reload_cache={reload_cache})")
check_url(object_id)
resp = requests.get(
object_id,
headers={
"Accept": "application/activity+json",
"User-Agent": self._user_agent,
},
)
if resp.status_code == 404:
raise ActivityNotFoundError(
f"{object_id} cannot be fetched, 404 error not found"
)
resp.raise_for_status() if not reload_cache and iri in self._cache:
return resp.json() return self._cache[iri]
def _fetch(self, object_id):
instance = urlparse(object_id)._replace(path="", query="", fragment="").geturl()
if instance not in self._known_instances:
self._known_instances.add(instance)
if not self._instances.find_one({"instance": instance}):
self._instances.insert(
{"instance": instance, "first_object": object_id}
)
obj = self._inbox.find_one(
{
"$or": [
{"remote_id": object_id},
{"type": "Create", "activity.object.id": object_id},
]
}
)
if obj:
if obj["remote_id"] == object_id:
return obj["activity"]
return obj["activity"]["object"]
obj = self._outbox.find_one(
{
"$or": [
{"remote_id": object_id},
{"type": "Create", "activity.object.id": object_id},
]
}
)
if obj:
if obj["remote_id"] == object_id:
return obj["activity"]
return obj["activity"]["object"]
return self._fetch_remote(object_id)
def get(
self,
object_id,
reload_cache=False,
part_of_stream=False,
announce_published=None,
):
if reload_cache:
obj = self._fetch(object_id)
self._col.update(
{"object_id": object_id},
{
"$set": {
"cached_object": obj,
"meta.part_of_stream": part_of_stream,
"meta.announce_published": announce_published,
}
},
upsert=True,
)
return obj
cached_object = self._col.find_one({"object_id": object_id})
if cached_object:
print(f"ObjectService: {cached_object}")
return cached_object["cached_object"]
obj = self._fetch(object_id)
self._col.update(
{"object_id": object_id},
{
"$set": {
"cached_object": obj,
"meta.part_of_stream": part_of_stream,
"meta.announce_published": announce_published,
}
},
upsert=True,
)
# print(f'ObjectService: {obj}')
obj = get_backend().fetch_iri(iri)
self._cache[iri] = obj
return obj return obj