More cleanup

This commit is contained in:
Thomas Sileo 2019-04-08 19:54:06 +02:00
parent 143b0953be
commit 8a57d0dfda
4 changed files with 76 additions and 96 deletions

View file

@ -18,6 +18,9 @@ reload-dev:
# docker build . -t microblogpub:latest # docker build . -t microblogpub:latest
docker-compose -f docker-compose-dev.yml up -d --force-recreate docker-compose -f docker-compose-dev.yml up -d --force-recreate
update-poussetaches:
git clone https://github.com/tsileo/poussetaches.git tmp_poussetaches && cd tmp_poussetaches && docker build . -t poussetaches:latest && cd - && rm -rf tmp_poussetaches
update: update:
git pull git pull
docker build . -t microblogpub:latest docker build . -t microblogpub:latest

View file

@ -1,6 +1,5 @@
import logging import logging
import os import os
import json
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import Any from typing import Any
@ -79,6 +78,52 @@ class Box(Enum):
REPLIES = "replies" REPLIES = "replies"
def save(box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {"undo": False, "deleted": False},
}
)
def followers() -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
def following() -> List[str]:
q = {
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
def followers_as_recipients() -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
recipients = []
for doc in DB.activities.find(q):
recipients.append(
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
)
return list(set(recipients))
class MicroblogPubBackend(Backend): class MicroblogPubBackend(Backend):
"""Implements a Little Boxes backend, backed by MongoDB.""" """Implements a Little Boxes backend, backed by MongoDB."""
@ -104,73 +149,18 @@ class MicroblogPubBackend(Backend):
"""URL for activity link.""" """URL for activity link."""
return f"{BASE_URL}/note/{obj_id}" return f"{BASE_URL}/note/{obj_id}"
def save(self, box: Box, activity: ap.BaseActivity) -> None:
"""Custom helper for saving an activity to the DB."""
DB.activities.insert_one(
{
"box": box.value,
"activity": activity.to_dict(),
"type": _to_list(activity.type),
"remote_id": activity.id,
"meta": {"undo": False, "deleted": False},
}
)
def followers(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["actor"] for doc in DB.activities.find(q)]
def followers_as_recipients(self) -> List[str]:
q = {
"box": Box.INBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
recipients = []
for doc in DB.activities.find(q):
recipients.append(
doc["meta"]["actor"]["sharedInbox"] or doc["meta"]["actor"]["inbox"]
)
return list(set(recipients))
def following(self) -> List[str]:
q = {
"box": Box.OUTBOX.value,
"type": ap.ActivityType.FOLLOW.value,
"meta.undo": False,
}
return [doc["activity"]["object"] for doc in DB.activities.find(q)]
def parse_collection( def parse_collection(
self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None self, payload: Optional[Dict[str, Any]] = None, url: Optional[str] = None
) -> List[str]: ) -> List[str]:
"""Resolve/fetch a `Collection`/`OrderedCollection`.""" """Resolve/fetch a `Collection`/`OrderedCollection`."""
# Resolve internal collections via MongoDB directly # Resolve internal collections via MongoDB directly
if url == ID + "/followers": if url == ID + "/followers":
return self.followers() return followers()
elif url == ID + "/following": elif url == ID + "/following":
return self.following() return following()
return super().parse_collection(payload, url) return super().parse_collection(payload, url)
@ensure_it_is_me
def outbox_is_blocked(self, as_actor: ap.Person, actor_id: str) -> bool:
return bool(
DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor_id,
"meta.undo": False,
}
)
)
def _fetch_iri(self, iri: str) -> ap.ObjectType: def _fetch_iri(self, iri: str) -> ap.ObjectType:
if iri == ME["id"]: if iri == ME["id"]:
return ME return ME
@ -229,13 +219,6 @@ class MicroblogPubBackend(Backend):
return data return data
@ensure_it_is_me
def inbox_check_duplicate(self, as_actor: ap.Person, iri: str) -> bool:
return bool(DB.activities.find_one({"box": Box.INBOX.value, "remote_id": iri}))
def set_post_to_remote_inbox(self, cb):
self.post_to_remote_inbox_cb = cb
@ensure_it_is_me @ensure_it_is_me
def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None: def undo_new_follower(self, as_actor: ap.Person, follow: ap.Follow) -> None:
DB.activities.update_one( DB.activities.update_one(
@ -471,7 +454,7 @@ class MicroblogPubBackend(Backend):
) )
if not creply: if not creply:
# It means the activity is not in the inbox, and not in the outbox, we want to save it # It means the activity is not in the inbox, and not in the outbox, we want to save it
self.save(Box.REPLIES, reply) save(Box.REPLIES, reply)
new_threads.append(reply.id) new_threads.append(reply.id)
while reply is not None: while reply is not None:
@ -482,7 +465,7 @@ class MicroblogPubBackend(Backend):
reply = ap.fetch_remote_activity(root_reply) reply = ap.fetch_remote_activity(root_reply)
q = {"activity.object.id": root_reply} q = {"activity.object.id": root_reply}
if not DB.activities.count(q): if not DB.activities.count(q):
self.save(Box.REPLIES, reply) save(Box.REPLIES, reply)
new_threads.append(reply.id) new_threads.append(reply.id)
DB.activities.update_one( DB.activities.update_one(
@ -493,25 +476,6 @@ class MicroblogPubBackend(Backend):
{"$set": {"meta.thread_root_parent": root_reply}}, {"$set": {"meta.thread_root_parent": root_reply}},
) )
def post_to_outbox(self, activity: ap.BaseActivity) -> None:
if activity.has_type(ap.CREATE_TYPES):
activity = activity.build_create()
self.save(Box.OUTBOX, activity)
# Assign create a random ID
obj_id = self.random_object_id()
activity.set_id(self.activity_url(obj_id), obj_id)
recipients = activity.recipients()
logger.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity)
for recp in recipients:
logger.debug(f"posting to {recp}")
self.post_to_remote_inbox(self.get_actor(), payload, recp)
def gen_feed(): def gen_feed():
fg = FeedGenerator() fg = FeedGenerator()

27
app.py
View file

@ -66,6 +66,8 @@ import config
import tasks # noqa: here just for the migration # FIXME(tsileo): remove me import tasks # noqa: here just for the migration # FIXME(tsileo): remove me
from activitypub import Box from activitypub import Box
from activitypub import embed_collection from activitypub import embed_collection
from activitypub import save
from activitypub import followers_as_recipients
from config import USER_AGENT from config import USER_AGENT
from config import ADMIN_API_KEY from config import ADMIN_API_KEY
from config import BASE_URL from config import BASE_URL
@ -1643,7 +1645,7 @@ def inbox():
data["object"] data["object"]
): ):
logger.info(f"received a Delete for an actor {data!r}") logger.info(f"received a Delete for an actor {data!r}")
if get_backend().inbox_check_duplicate(MY_PERSON, data["id"]): if DB.activities.find_one({"box": Box.INBOX.value, "remote_id": data["id"]}):
# The activity is already in the inbox # The activity is already in the inbox
logger.info(f"received duplicate activity {data!r}, dropping it") logger.info(f"received duplicate activity {data!r}, dropping it")
@ -2295,7 +2297,9 @@ def task_finish_post_to_outbox():
elif obj.has_type(ap.ActivityType.ANNOUNCE): elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.outbox_undo_announce(MY_PERSON, obj) back.outbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW): elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_following(MY_PERSON, obj) DB.activities.update_one(
{"remote_id": obj.id}, {"$set": {"meta.undo": True}}
)
app.logger.info(f"recipients={recipients}") app.logger.info(f"recipients={recipients}")
activity = ap.clean_activity(activity.to_dict()) activity = ap.clean_activity(activity.to_dict())
@ -2345,7 +2349,9 @@ def task_finish_post_to_inbox():
elif obj.has_type(ap.ActivityType.ANNOUNCE): elif obj.has_type(ap.ActivityType.ANNOUNCE):
back.inbox_undo_announce(MY_PERSON, obj) back.inbox_undo_announce(MY_PERSON, obj)
elif obj.has_type(ap.ActivityType.FOLLOW): elif obj.has_type(ap.ActivityType.FOLLOW):
back.undo_new_follower(MY_PERSON, obj) DB.activities.update_one(
{"remote_id": obj.id}, {"$set": {"meta.undo": True}}
)
try: try:
invalidate_cache(activity) invalidate_cache(activity)
except Exception: except Exception:
@ -2367,7 +2373,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
obj_id = back.random_object_id() obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id) activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity) save(Box.OUTBOX, activity)
Tasks.cache_actor(activity.id) Tasks.cache_actor(activity.id)
Tasks.finish_post_to_outbox(activity.id) Tasks.finish_post_to_outbox(activity.id)
return activity.id return activity.id
@ -2376,7 +2382,14 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
def post_to_inbox(activity: ap.BaseActivity) -> None: def post_to_inbox(activity: ap.BaseActivity) -> None:
# Check for Block activity # Check for Block activity
actor = activity.get_actor() actor = activity.get_actor()
if back.outbox_is_blocked(MY_PERSON, actor.id): if DB.activities.find_one(
{
"box": Box.OUTBOX.value,
"type": ap.ActivityType.BLOCK.value,
"activity.object": actor.id,
"meta.undo": False,
}
):
app.logger.info( app.logger.info(
f"actor {actor!r} is blocked, dropping the received activity {activity!r}" f"actor {actor!r} is blocked, dropping the received activity {activity!r}"
) )
@ -2386,7 +2399,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
# The activity is already in the inbox # The activity is already in the inbox
app.logger.info(f"received duplicate activity {activity!r}, dropping it") app.logger.info(f"received duplicate activity {activity!r}, dropping it")
back.save(Box.INBOX, activity) save(Box.INBOX, activity)
Tasks.process_new_activity(activity.id) Tasks.process_new_activity(activity.id)
app.logger.info(f"spawning task for {activity!r}") app.logger.info(f"spawning task for {activity!r}")
@ -2654,7 +2667,7 @@ def task_forward_activity():
iri = task.payload iri = task.payload
try: try:
activity = ap.fetch_remote_activity(iri) activity = ap.fetch_remote_activity(iri)
recipients = back.followers_as_recipients() recipients = followers_as_recipients()
app.logger.debug(f"Forwarding {activity!r} to {recipients}") app.logger.debug(f"Forwarding {activity!r} to {recipients}")
activity = ap.clean_activity(activity.to_dict()) activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity) payload = json.dumps(activity)

View file

@ -312,7 +312,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
# The activity is already in the inbox # The activity is already in the inbox
log.info(f"received duplicate activity {activity!r}, dropping it") log.info(f"received duplicate activity {activity!r}, dropping it")
back.save(Box.INBOX, activity) activitypub.save(Box.INBOX, activity)
process_new_activity.delay(activity.id) process_new_activity.delay(activity.id)
log.info(f"spawning task for {activity!r}") log.info(f"spawning task for {activity!r}")
@ -387,7 +387,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
obj_id = back.random_object_id() obj_id = back.random_object_id()
activity.set_id(back.activity_url(obj_id), obj_id) activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity) activitypub.save(Box.OUTBOX, activity)
cache_actor.delay(activity.id) cache_actor.delay(activity.id)
finish_post_to_outbox.delay(activity.id) finish_post_to_outbox.delay(activity.id)
return activity.id return activity.id
@ -440,7 +440,7 @@ def finish_post_to_outbox(self, iri: str) -> None:
def forward_activity(self, iri: str) -> None: def forward_activity(self, iri: str) -> None:
try: try:
activity = ap.fetch_remote_activity(iri) activity = ap.fetch_remote_activity(iri)
recipients = back.followers_as_recipients() recipients = activitypub.followers_as_recipients()
log.debug(f"Forwarding {activity!r} to {recipients}") log.debug(f"Forwarding {activity!r} to {recipients}")
activity = ap.clean_activity(activity.to_dict()) activity = ap.clean_activity(activity.to_dict())
payload = json.dumps(activity) payload = json.dumps(activity)