forked from forks/microblog.pub
Tweak the background tasks retry
This commit is contained in:
parent
7aff43f0f6
commit
a8baa88fb5
1 changed files with 13 additions and 11 deletions
24
tasks.py
24
tasks.py
|
@ -39,8 +39,10 @@ ap.use_backend(back)
|
||||||
|
|
||||||
MY_PERSON = ap.Person(**ME)
|
MY_PERSON = ap.Person(**ME)
|
||||||
|
|
||||||
|
MAX_RETRIES = 9
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12) # noqa: C901
|
|
||||||
|
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901
|
||||||
def process_new_activity(self, iri: str) -> None:
|
def process_new_activity(self, iri: str) -> None:
|
||||||
"""Process an activity received in the inbox"""
|
"""Process an activity received in the inbox"""
|
||||||
try:
|
try:
|
||||||
|
@ -141,7 +143,7 @@ def process_new_activity(self, iri: str) -> None:
|
||||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12) # noqa: C901
|
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901
|
||||||
def fetch_og_metadata(self, iri: str) -> None:
|
def fetch_og_metadata(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -176,7 +178,7 @@ def fetch_og_metadata(self, iri: str) -> None:
|
||||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12)
|
@app.task(bind=True, max_retries=MAX_RETRIES)
|
||||||
def cache_object(self, iri: str) -> None:
|
def cache_object(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -200,7 +202,7 @@ def cache_object(self, iri: str) -> None:
|
||||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12)
|
@app.task(bind=True, max_retries=MAX_RETRIES)
|
||||||
def cache_actor(self, iri: str, also_cache_attachments: bool = True) -> None:
|
def cache_actor(self, iri: str, also_cache_attachments: bool = True) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -256,7 +258,7 @@ def cache_actor(self, iri: str, also_cache_attachments: bool = True) -> None:
|
||||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12)
|
@app.task(bind=True, max_retries=MAX_RETRIES)
|
||||||
def cache_attachments(self, iri: str) -> None:
|
def cache_attachments(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -288,7 +290,7 @@ def cache_attachments(self, iri: str) -> None:
|
||||||
|
|
||||||
log.info(f"attachments cached for {iri}")
|
log.info(f"attachments cached for {iri}")
|
||||||
|
|
||||||
except (ActivityGoneError, ActivityNotFoundError):
|
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||||
log.exception(f"dropping activity {iri}, no attachment caching")
|
log.exception(f"dropping activity {iri}, no attachment caching")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.exception(f"failed to cache attachments for {iri}")
|
log.exception(f"failed to cache attachments for {iri}")
|
||||||
|
@ -315,7 +317,7 @@ def post_to_inbox(activity: ap.BaseActivity) -> None:
|
||||||
finish_post_to_inbox.delay(activity.id)
|
finish_post_to_inbox.delay(activity.id)
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12) # noqa: C901
|
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901
|
||||||
def finish_post_to_inbox(self, iri: str) -> None:
|
def finish_post_to_inbox(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -343,7 +345,7 @@ def finish_post_to_inbox(self, iri: str) -> None:
|
||||||
back.inbox_undo_announce(MY_PERSON, obj)
|
back.inbox_undo_announce(MY_PERSON, obj)
|
||||||
elif obj.has_type(ap.ActivityType.FOLLOW):
|
elif obj.has_type(ap.ActivityType.FOLLOW):
|
||||||
back.undo_new_follower(MY_PERSON, obj)
|
back.undo_new_follower(MY_PERSON, obj)
|
||||||
except (ActivityGoneError, ActivityNotFoundError):
|
except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError):
|
||||||
log.exception(f"no retry")
|
log.exception(f"no retry")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.exception(f"failed to cache attachments for {iri}")
|
log.exception(f"failed to cache attachments for {iri}")
|
||||||
|
@ -364,7 +366,7 @@ def post_to_outbox(activity: ap.BaseActivity) -> str:
|
||||||
return activity.id
|
return activity.id
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12) # noqa:C901
|
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa:C901
|
||||||
def finish_post_to_outbox(self, iri: str) -> None:
|
def finish_post_to_outbox(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -405,7 +407,7 @@ def finish_post_to_outbox(self, iri: str) -> None:
|
||||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12) # noqa:C901
|
@app.task(bind=True, max_retries=MAX_RETRIES) # noqa:C901
|
||||||
def forward_activity(self, iri: str) -> None:
|
def forward_activity(self, iri: str) -> None:
|
||||||
try:
|
try:
|
||||||
activity = ap.fetch_remote_activity(iri)
|
activity = ap.fetch_remote_activity(iri)
|
||||||
|
@ -421,7 +423,7 @@ def forward_activity(self, iri: str) -> None:
|
||||||
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
self.retry(exc=err, countdown=int(random.uniform(2, 4) ** self.request.retries))
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, max_retries=12)
|
@app.task(bind=True, max_retries=MAX_RETRIES)
|
||||||
def post_to_remote_inbox(self, payload: str, to: str) -> None:
|
def post_to_remote_inbox(self, payload: str, to: str) -> None:
|
||||||
try:
|
try:
|
||||||
log.info("payload=%s", payload)
|
log.info("payload=%s", payload)
|
||||||
|
|
Loading…
Reference in a new issue