microblog.pub/tasks.py

149 lines
4.9 KiB
Python
Raw Normal View History

2018-06-01 18:29:44 +00:00
import json
2018-05-18 18:41:41 +00:00
import logging
2018-06-16 20:02:10 +00:00
import os
2018-05-18 18:41:41 +00:00
import random
import requests
from celery import Celery
2018-07-11 21:22:47 +00:00
from little_boxes import activitypub as ap
2018-07-12 22:44:33 +00:00
from little_boxes.errors import ActivityGoneError
from little_boxes.errors import ActivityNotFoundError
2018-07-11 21:22:47 +00:00
from little_boxes.httpsig import HTTPSigAuth
from little_boxes.linked_data_sig import generate_signature
2018-05-18 18:41:41 +00:00
from requests.exceptions import HTTPError
2018-07-11 21:22:47 +00:00
import activitypub
2018-06-16 20:02:10 +00:00
from config import DB
2018-05-18 18:41:41 +00:00
from config import HEADERS
from config import KEY
2018-07-11 21:22:47 +00:00
from config import MEDIA_CACHE
2018-05-18 18:41:41 +00:00
from config import USER_AGENT
2018-07-11 19:53:47 +00:00
from utils.media import Kind
2018-05-18 18:41:41 +00:00
2018-05-28 17:46:23 +00:00
log = logging.getLogger(__name__)
2018-06-16 20:02:10 +00:00
app = Celery(
"tasks", broker=os.getenv("MICROBLOGPUB_AMQP_BROKER", "pyamqp://guest@localhost//")
)
2018-06-16 20:33:51 +00:00
SigAuth = HTTPSigAuth(KEY)
2018-05-18 18:41:41 +00:00
2018-07-11 21:22:47 +00:00
back = activitypub.MicroblogPubBackend()
ap.use_backend(back)
2018-07-11 19:53:47 +00:00
@app.task(bind=True, max_retries=12)
def process_new_activity(self, iri: str) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
2018-07-14 10:14:08 +00:00
# Is the activity expected?
# following = ap.get_backend().following()
2018-07-11 19:53:47 +00:00
tag_stream = False
2018-07-11 21:22:47 +00:00
if activity.has_type(ap.ActivityType.ANNOUNCE):
2018-07-11 19:53:47 +00:00
tag_stream = True
elif activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object()
if not note.inReplyTo:
tag_stream = True
log.info(f"{iri} tag_stream={tag_stream}")
2018-07-11 21:22:47 +00:00
DB.activities.update_one(
{"remote_id": activity.id}, {"$set": {"meta.stream": tag_stream}}
)
2018-07-11 19:53:47 +00:00
log.info(f"new activity {iri} processed")
2018-07-19 23:12:02 +00:00
cache_actor.delay(iri)
2018-07-12 22:44:33 +00:00
except (ActivityGoneError, ActivityNotFoundError):
2018-07-19 23:12:02 +00:00
log.exception(f"dropping activity {iri}, skip processing")
2018-07-11 19:53:47 +00:00
except Exception as err:
log.exception(f"failed to process new activity {iri}")
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
2018-07-19 23:12:02 +00:00
@app.task(bind=True, max_retries=12)
def cache_actor(self, iri: str, also_cache_attachments: bool = True) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
actor = activity.get_actor()
# Cache the actor info
DB.activities.update_one(
2018-07-20 08:56:39 +00:00
{"remote_id": iri}, {"$set": {"meta.actor": activitypub._actor_to_meta(actor)}}
2018-07-19 23:12:02 +00:00
)
log.info(f"actor cached for {iri}")
if also_cache_attachments:
cache_attachments.delay(iri)
except (ActivityGoneError, ActivityNotFoundError):
DB.activities.update_one({"remote_id": iri}, {"$set": {"meta.deleted": True}})
log.exception(f"flagging activity {iri} as deleted, no actor caching")
except Exception as err:
log.exception(f"failed to cache actor for {iri}")
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
2018-07-11 19:53:47 +00:00
@app.task(bind=True, max_retries=12)
def cache_attachments(self, iri: str) -> None:
try:
activity = ap.fetch_remote_activity(iri)
log.info(f"activity={activity!r}")
# Generates thumbnails for the actor's icon and the attachments if any
actor = activity.get_actor()
# Update the cached actor
DB.actors.update_one(
{"remote_id": iri},
{"$set": {"remote_id": iri, "data": actor.to_dict(embed=True)}},
upsert=True,
)
if actor.icon:
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
2018-07-11 20:52:20 +00:00
2018-07-11 19:53:47 +00:00
if activity.has_type(ap.ActivityType.CREATE):
for attachment in activity.get_object()._data.get("attachment", []):
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
2018-07-11 21:22:47 +00:00
log.info(f"attachments cached for {iri}")
2018-07-11 19:53:47 +00:00
2018-07-12 22:44:33 +00:00
except (ActivityGoneError, ActivityNotFoundError):
2018-07-19 23:12:02 +00:00
log.exception(f"dropping activity {iri}, no attachment caching")
2018-07-11 19:53:47 +00:00
except Exception as err:
2018-07-19 23:12:02 +00:00
log.exception(f"failed to cache attachments for {iri}")
2018-07-11 19:53:47 +00:00
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
2018-05-18 18:41:41 +00:00
@app.task(bind=True, max_retries=12)
2018-06-01 18:29:44 +00:00
def post_to_inbox(self, payload: str, to: str) -> None:
2018-05-18 18:41:41 +00:00
try:
2018-06-16 20:02:10 +00:00
log.info("payload=%s", payload)
log.info("generating sig")
2018-06-01 18:29:44 +00:00
signed_payload = json.loads(payload)
2018-06-16 20:33:51 +00:00
generate_signature(signed_payload, KEY)
2018-06-16 20:02:10 +00:00
log.info("to=%s", to)
resp = requests.post(
to,
data=json.dumps(signed_payload),
auth=SigAuth,
headers={
"Content-Type": HEADERS[1],
"Accept": HEADERS[1],
"User-Agent": USER_AGENT,
},
)
log.info("resp=%s", resp)
log.info("resp_body=%s", resp.text)
2018-05-18 18:41:41 +00:00
resp.raise_for_status()
except HTTPError as err:
2018-06-16 20:02:10 +00:00
log.exception("request failed")
2018-05-18 18:41:41 +00:00
if 400 >= err.response.status_code >= 499:
2018-06-16 20:02:10 +00:00
log.info("client error, no retry")
2018-05-18 18:41:41 +00:00
return
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))