Tasks cleanup

This commit is contained in:
Thomas Sileo 2018-08-05 13:49:49 +02:00
parent 53e5a9e237
commit 5dce025700
3 changed files with 15 additions and 28 deletions

View file

@ -116,11 +116,6 @@ class MicroblogPubBackend(Backend):
} }
) )
self.save_cb(box, activity.id)
def set_save_cb(self, cb):
self.save_cb = cb
def followers(self) -> List[str]: def followers(self) -> List[str]:
q = { q = {
"box": Box.INBOX.value, "box": Box.INBOX.value,

11
app.py
View file

@ -79,17 +79,6 @@ from utils.lookup import lookup
from utils.media import Kind from utils.media import Kind
back = activitypub.MicroblogPubBackend() back = activitypub.MicroblogPubBackend()
def save_cb(box: Box, iri: str) -> None:
if box == Box.INBOX:
tasks.process_new_activity.delay(iri)
else:
tasks.cache_actor.delay(iri)
back.set_save_cb(save_cb)
ap.use_backend(back) ap.use_backend(back)
MY_PERSON = ap.Person(**ME) MY_PERSON = ap.Person(**ME)

View file

@ -22,6 +22,7 @@ from config import ID
from config import KEY from config import KEY
from config import MEDIA_CACHE from config import MEDIA_CACHE
from config import USER_AGENT from config import USER_AGENT
from config import BASE_URL
from utils import opengraph from utils import opengraph
from utils.media import Kind from utils.media import Kind
@ -36,16 +37,6 @@ back = activitypub.MicroblogPubBackend()
ap.use_backend(back) ap.use_backend(back)
def save_cb(box: Box, iri: str) -> None:
if box == Box.INBOX:
process_new_activity.delay(iri)
else:
cache_actor.delay(iri)
back.set_save_cb(save_cb)
MY_PERSON = ap.Person(**ME) MY_PERSON = ap.Person(**ME)
@ -70,6 +61,9 @@ def process_new_activity(self, iri: str) -> None:
# Most likely on OStatus notice # Most likely on OStatus notice
tag_stream = False tag_stream = False
should_delete = True should_delete = True
except (ActivityGoneError, ActivityNotFoundError):
# The announced activity is deleted/gone, drop it
should_delete = True
elif activity.has_type(ap.ActivityType.CREATE): elif activity.has_type(ap.ActivityType.CREATE):
note = activity.get_object() note = activity.get_object()
@ -112,6 +106,12 @@ def process_new_activity(self, iri: str) -> None:
# If the activity was originally forwarded, forward the delete too # If the activity was originally forwarded, forward the delete too
should_forward = True should_forward = True
elif activity.has_type(ap.ActivityType.LIKE):
if not activity.get_object_id.startswith(BASE_URL):
# We only want to keep a like if it's a like for a local activity
# (Pleroma relay the likes it received, we don't want to store them)
should_delete = True
if should_forward: if should_forward:
log.info(f"will forward {activity!r} to followers") log.info(f"will forward {activity!r} to followers")
forward_activity.delay(activity.id) forward_activity.delay(activity.id)
@ -132,7 +132,7 @@ def process_new_activity(self, iri: str) -> None:
) )
log.info(f"new activity {iri} processed") log.info(f"new activity {iri} processed")
if not should_delete: if not should_delete and not activity.has_type(ap.ActivityType.DELETE):
cache_actor.delay(iri) cache_actor.delay(iri)
except (ActivityGoneError, ActivityNotFoundError): except (ActivityGoneError, ActivityNotFoundError):
log.exception(f"dropping activity {iri}, skip processing") log.exception(f"dropping activity {iri}, skip processing")
@ -245,7 +245,7 @@ def cache_actor(self, iri: str, also_cache_attachments: bool = True) -> None:
) )
log.info(f"actor cached for {iri}") log.info(f"actor cached for {iri}")
if also_cache_attachments: if also_cache_attachments and activity.has_type(ap.ActivityType.CREATE):
cache_attachments.delay(iri) cache_attachments.delay(iri)
except (ActivityGoneError, ActivityNotFoundError): except (ActivityGoneError, ActivityNotFoundError):
@ -309,6 +309,8 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
log.info(f"received duplicate activity {activity!r}, dropping it") log.info(f"received duplicate activity {activity!r}, dropping it")
back.save(Box.INBOX, activity) back.save(Box.INBOX, activity)
process_new_activity.delay(activity.id)
log.info(f"spawning task for {activity!r}") log.info(f"spawning task for {activity!r}")
finish_post_to_inbox.delay(activity.id) finish_post_to_inbox.delay(activity.id)
@ -356,6 +358,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
activity.set_id(back.activity_url(obj_id), obj_id) activity.set_id(back.activity_url(obj_id), obj_id)
back.save(Box.OUTBOX, activity) back.save(Box.OUTBOX, activity)
cache_actor.delay(activity.id)
finish_post_to_outbox.delay(activity.id) finish_post_to_outbox.delay(activity.id)
return activity.id return activity.id