From a36b56af4ebdf0899cb1d58b8325c5695541cfc7 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Wed, 11 Jul 2018 21:53:47 +0200 Subject: [PATCH] Move the caching to celery --- activitypub.py | 11 ++-------- tasks.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 9 deletions(-) diff --git a/activitypub.py b/activitypub.py index aeac7e9..f92769d 100644 --- a/activitypub.py +++ b/activitypub.py @@ -18,7 +18,6 @@ from config import DB from config import EXTRA_INBOXES from config import ID from config import ME -from config import MEDIA_CACHE from config import USER_AGENT from config import USERNAME from little_boxes import activitypub as ap @@ -27,7 +26,6 @@ from little_boxes.activitypub import _to_list from little_boxes.backend import Backend from little_boxes.errors import ActivityGoneError from little_boxes.errors import Error -from utils.media import Kind logger = logging.getLogger(__name__) @@ -97,13 +95,8 @@ class MicroblogPubBackend(Backend): } ) - # Generates thumbnails for the actor's icon and the attachments if any - actor = activity.get_actor() - if actor.icon: - MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON) - if activity.type == ap.ActivityType.CREATE.value: - for attachment in activity.get_object()._data.get("attachment", []): - MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) + tasks.process_new_activity.delay(activity.id) + tasks.cache_attachments(activity.id) @ensure_it_is_me def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: diff --git a/tasks.py b/tasks.py index a5c85db..79d2857 100644 --- a/tasks.py +++ b/tasks.py @@ -7,6 +7,7 @@ import requests from celery import Celery from requests.exceptions import HTTPError +from little_boxes import activitypub as ap from config import DB from config import HEADERS from config import KEY @@ -14,6 +15,8 @@ from config import USER_AGENT from little_boxes.httpsig import HTTPSigAuth from little_boxes.linked_data_sig import generate_signature from utils.opengraph import fetch_og_metadata +from utils.media import Kind +from config import MEDIA_CACHE log = logging.getLogger(__name__) app = Celery( @@ -22,6 +25,58 @@ app = Celery( SigAuth = HTTPSigAuth(KEY) +@app.task(bind=True, max_retries=12) +def process_new_activity(self, iri: str) -> None: + try: + activity = ap.fetch_remote_activity(iri) + log.info(f"activity={activity!r}") + + tag_stream = False + if activity.has_type(ap.ActivityType.ANNOUCE): + tag_stream = True + elif activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + if not note.inReplyTo: + tag_stream = True + + log.info(f"{iri} tag_stream={tag_stream}") + DB.update_one({"remote_id": activity.id}, {"$set": {"meta.stream": tag_stream}}) + + log.info(f"new activity {iri} processed") + except Exception as err: + log.exception(f"failed to process new activity {iri}") + self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries)) + + +@app.task(bind=True, max_retries=12) +def cache_attachments(self, iri: str) -> None: + try: + activity = ap.fetch_remote_activity(iri) + log.info(f"activity={activity!r}") + # Generates thumbnails for the actor's icon and the attachments if any + + actor = activity.get_actor() + + # Update the cached actor + DB.actors.update_one( + {"remote_id": iri}, + {"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}}, + upsert=True, + ) + + if actor.icon: + MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON) + if activity.has_type(ap.ActivityType.CREATE): + for attachment in activity.get_object()._data.get("attachment", []): + MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) + + log.info(f"attachmwents cached for {iri}") + + except Exception as err: + log.exception(f"failed to process new activity {iri}") + self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries)) + + @app.task(bind=True, max_retries=12) def post_to_inbox(self, payload: str, to: str) -> None: try: