Move the caching to celery

This commit is contained in:
Thomas Sileo 2018-07-11 21:53:47 +02:00
parent d6d20a972e
commit a36b56af4e
2 changed files with 57 additions and 9 deletions

View file

@ -18,7 +18,6 @@ from config import DB
from config import EXTRA_INBOXES from config import EXTRA_INBOXES
from config import ID from config import ID
from config import ME from config import ME
from config import MEDIA_CACHE
from config import USER_AGENT from config import USER_AGENT
from config import USERNAME from config import USERNAME
from little_boxes import activitypub as ap from little_boxes import activitypub as ap
@ -27,7 +26,6 @@ from little_boxes.activitypub import _to_list
from little_boxes.backend import Backend from little_boxes.backend import Backend
from little_boxes.errors import ActivityGoneError from little_boxes.errors import ActivityGoneError
from little_boxes.errors import Error from little_boxes.errors import Error
from utils.media import Kind
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -97,13 +95,8 @@ class MicroblogPubBackend(Backend):
} }
) )
# Generates thumbnails for the actor's icon and the attachments if any tasks.process_new_activity.delay(activity.id)
actor = activity.get_actor() tasks.cache_attachments(activity.id)
if actor.icon:
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
if activity.type == ap.ActivityType.CREATE.value:
for attachment in activity.get_object()._data.get("attachment", []):
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
@ensure_it_is_me @ensure_it_is_me
def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None: def outbox_new(self, as_actor: ap.Person, activity: ap.BaseActivity) -> None:

View file

@ -7,6 +7,7 @@ import requests
from celery import Celery from celery import Celery
from requests.exceptions import HTTPError from requests.exceptions import HTTPError
from little_boxes import activitypub as ap
from config import DB from config import DB
from config import HEADERS from config import HEADERS
from config import KEY from config import KEY
@ -14,6 +15,8 @@ from config import USER_AGENT
from little_boxes.httpsig import HTTPSigAuth from little_boxes.httpsig import HTTPSigAuth
from little_boxes.linked_data_sig import generate_signature from little_boxes.linked_data_sig import generate_signature
from utils.opengraph import fetch_og_metadata from utils.opengraph import fetch_og_metadata
from utils.media import Kind
from config import MEDIA_CACHE
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
app = Celery( app = Celery(
@ -22,6 +25,58 @@ app = Celery(
SigAuth = HTTPSigAuth(KEY) SigAuth = HTTPSigAuth(KEY)
@app.task(bind=True, max_retries=12)
def process_new_activity(self, iri: str) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
tag_stream = False
if activity.has_type(ap.ActivityType.ANNOUCE):
tag_stream = True
elif activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object()
if not note.inReplyTo:
tag_stream = True
log.info(f"{iri} tag_stream={tag_stream}")
DB.update_one({"remote_id": activity.id}, {"$set": {"meta.stream": tag_stream}})
log.info(f"new activity {iri} processed")
except Exception as err:
log.exception(f"failed to process new activity {iri}")
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
@app.task(bind=True, max_retries=12)
def cache_attachments(self, iri: str) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
# Generates thumbnails for the actor's icon and the attachments if any
actor = activity.get_actor()
# Update the cached actor
DB.actors.update_one(
{"remote_id": iri},
{"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}},
upsert=True,
)
if actor.icon:
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
if activity.has_type(ap.ActivityType.CREATE):
for attachment in activity.get_object()._data.get("attachment", []):
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
log.info(f"attachmwents cached for {iri}")
except Exception as err:
log.exception(f"failed to process new activity {iri}")
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
@app.task(bind=True, max_retries=12) @app.task(bind=True, max_retries=12)
def post_to_inbox(self, payload: str, to: str) -> None: def post_to_inbox(self, payload: str, to: str) -> None:
try: try: