diff --git a/gc.py b/activity_gc.py similarity index 56% rename from gc.py rename to activity_gc.py index 7e6f876..318b818 100644 --- a/gc.py +++ b/activity_gc.py @@ -1,16 +1,19 @@ import logging from datetime import datetime from datetime import timedelta +from time import perf_counter +from typing import Any +from typing import Dict from typing import List from little_boxes import activitypub as ap import activitypub from activitypub import Box +from config import DAYS_TO_KEEP from config import ID from config import ME from config import MEDIA_CACHE -from config import DAYS_TO_KEEP from utils.migrations import DB back = activitypub.MicroblogPubBackend() @@ -45,20 +48,29 @@ def threads_of_interest() -> List[str]: return list(out) -def perform() -> None: +def _keep(data: Dict[str, Any]): + DB.activities.update_one({"_id": data["_id"]}, {"$set": {"meta.gc_keep": True}}) + + +def perform() -> None: # noqa: C901 + start = perf_counter() d = (datetime.utcnow() - timedelta(days=DAYS_TO_KEEP)).strftime("%Y-%m-%d") toi = threads_of_interest() logger.info(f"thread_of_interest={toi!r}") + create_deleted = 0 + create_count = 0 # Go over the old Create activities for data in DB.activities.find( { "box": Box.INBOX.value, "type": ap.ActivityType.CREATE.value, "activity.published": {"$lt": d}, + "meta.gc_keep": {"$exists": False}, } - ): + ).limit(500): try: + create_count += 1 remote_id = data["remote_id"] meta = data["meta"] activity = ap.parse_activity(data["activity"]) @@ -66,6 +78,7 @@ def perform() -> None: # This activity has been bookmarked, keep it if meta.get("bookmarked"): + _keep(data) continue # Inspect the object @@ -73,21 +86,25 @@ def perform() -> None: # This activity mentions the server actor, keep it if obj.has_mention(ID): + _keep(data) continue # This activity is a direct reply of one the server actor activity, keep it in_reply_to = obj.get_in_reply_to() if in_reply_to and in_reply_to.startswith(ID): + _keep(data) continue # This activity is part of a thread we want to keep, keep it if in_reply_to and meta.get("thread_root_parent"): thread_root_parent = meta["thread_root_parent"] if thread_root_parent.startswith(ID) or thread_root_parent in toi: + _keep(data) continue # This activity was boosted or liked, keep it if meta.get("boosted") or meta.get("liked"): + _keep(data) continue # TODO(tsileo): remove after tests @@ -95,6 +112,7 @@ def perform() -> None: logger.warning( f"{activity!r} would not have been deleted, skipping for now" ) + _keep(data) continue # Delete the cached attachment @@ -103,5 +121,62 @@ def perform() -> None: # Delete the activity DB.activities.delete_one({"_id": data["_id"]}) + create_deleted += 1 except Exception: logger.exception(f"failed to process {data!r}") + + after_gc_create = perf_counter() + time_to_gc_create = start - after_gc_create + logger.info( + f"{time_to_gc_create:.2f} seconds to analyze {create_count} Create, {create_deleted} deleted" + ) + + announce_count = 0 + announce_deleted = 0 + # Go over the old Create activities + for data in DB.activities.find( + { + "box": Box.INBOX.value, + "type": ap.ActivityType.ANNOUNCE.value, + "activity.published": {"$lt": d}, + "meta.gc_keep": {"$exists": False}, + } + ).limit(500): + try: + announce_count += 1 + remote_id = data["remote_id"] + meta = data["meta"] + activity = ap.parse_activity(data["activity"]) + logger.info(f"activity={activity!r}") + + # This activity has been bookmarked, keep it + if meta.get("bookmarked"): + _keep(data) + continue + + object_id = activity.get_object_id() + + # This announce is for a local activity (i.e. from the outbox), keep it + if object_id.startswith(ID): + _keep(data) + continue + + for grid_item in MEDIA_CACHE.fs.find({"remote_id": remote_id}): + MEDIA_CACHE.fs.delete(grid_item._id) + + # TODO(tsileo): here for legacy reason, this needs to be removed at some point + for grid_item in MEDIA_CACHE.fs.find({"remote_id": object_id}): + MEDIA_CACHE.fs.delete(grid_item._id) + + # Delete the activity + DB.activities.delete_one({"_id": data["_id"]}) + + announce_deleted += 1 + except Exception: + logger.exception(f"failed to process {data!r}") + + after_gc_announce = perf_counter() + time_to_gc_announce = after_gc_create - after_gc_announce + logger.info( + f"{time_to_gc_announce:.2f} seconds to analyze {announce_count} Announce, {announce_deleted} deleted" + ) diff --git a/app.py b/app.py index ee547ef..43bc968 100644 --- a/app.py +++ b/app.py @@ -58,6 +58,7 @@ from requests.exceptions import HTTPError from u2flib_server import u2f from werkzeug.utils import secure_filename +import activity_gc import activitypub import config from activitypub import Box @@ -116,6 +117,7 @@ root_logger = logging.getLogger() if os.getenv("FLASK_DEBUG"): logger.setLevel(logging.DEBUG) root_logger.setLevel(logging.DEBUG) + root_logger.handlers = app.logger.handlers else: gunicorn_logger = logging.getLogger("gunicorn.error") root_logger.handlers = gunicorn_logger.handlers @@ -3276,7 +3278,7 @@ def task_update_question(): def task_cleanup(): task = p.parse(request) app.logger.info(f"task={task!r}") - # p.push({}, "/task/cleanup_part_1") + activity_gc.perform() return ""