From 363dbf4b6ac473f37e0660ec9905e8c8b7f4625d Mon Sep 17 00:00:00 2001
From: Thomas Sileo <t@a4.io>
Date: Sun, 7 Apr 2019 21:24:52 +0200
Subject: [PATCH] Start working on a clenaup task for old activities

---
 app.py          | 88 ++++++++++++++++++++++++++++++++++++++++++++++---
 poussetaches.py | 26 ++++++++-------
 2 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/app.py b/app.py
index 19275a7..507c767 100644
--- a/app.py
+++ b/app.py
@@ -6,6 +6,7 @@ import os
 import traceback
 import urllib
 from datetime import datetime
+from datetime import timedelta
 from datetime import timezone
 from functools import wraps
 from io import BytesIO
@@ -507,6 +508,7 @@ def handle_activitypub_error(error):
 
 class TaskError(Exception):
     """Raised to log the error for poussetaches."""
+
     def __init__(self):
         self.message = traceback.format_exc()
 
@@ -1415,13 +1417,72 @@ def admin():
     )
 
 
+@app.route("/admin/cleanup", methods=["GET"])
+@login_required
+def admin_cleanup():
+    d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
+
+    # Announce and Like cleanup
+    for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
+        # Migrate old (before meta.keep activities on the fly)
+        DB.activities.update_many(
+            {
+                "box": Box.INBOX.value,
+                "type": ap_type.value,
+                "meta.keep": {"$exists": False},
+                "activity.object": {"$regex": f"^{BASE_URL}"},
+            },
+            {"$set": {"meta.keep": True}},
+        )
+
+        DB.activities.update_many(
+            {
+                "box": Box.INBOX.value,
+                "type": ap_type.value,
+                "meta.keep": {"$exists": False},
+                "activity.object.id": {"$regex": f"^{BASE_URL}"},
+            },
+            {"$set": {"meta.keep": True}},
+        )
+
+        DB.activities.update_many(
+            {
+                "box": Box.INBOX.value,
+                "type": ap_type.value,
+                "meta.keep": {"$exists": False},
+            },
+            {"$set": {"meta.keep": False}},
+        )
+        # End of the migration
+
+        # Delete old activities
+        DB.activities.delete_many(
+            {
+                "box": Box.INBOX.value,
+                "type": ap_type.value,
+                "meta.keep": False,
+                "activity.published": {"$lt": d},
+            }
+        )
+
+        # And delete the soft-deleted one
+        DB.activities.delete_many(
+            {
+                "box": Box.INBOX.value,
+                "type": ap_type.value,
+                "meta.keep": False,
+                "meta.deleted": True,
+            }
+        )
+
+    return "OK"
+
+
 @app.route("/admin/tasks", methods=["GET"])
 @login_required
 def admin_tasks():
     return render_template(
-        "admin_tasks.html",
-        dead=p.get_dead(),
-        waiting=p.get_waiting(),
+        "admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting()
     )
 
 
@@ -2239,6 +2300,7 @@ def token_endpoint():
 #################
 # Feeds
 
+
 @app.route("/feed.json")
 def json_feed():
     return Response(
@@ -2266,6 +2328,7 @@ def rss_feed():
 ###########
 # Tasks
 
+
 class Tasks:
     @staticmethod
     def cache_object(iri: str) -> None:
@@ -2373,6 +2436,7 @@ def task_cache_object():
 
     return ""
 
+
 @app.route("/task/finish_post_to_outbox", methods=["POST"])  # noqa:C901
 def task_finish_post_to_outbox():
     task = p.parse(request)
@@ -2642,12 +2706,15 @@ def task_process_new_activity():
         # following = ap.get_backend().following()
         should_forward = False
         should_delete = False
+        should_keep = False
 
         tag_stream = False
         if activity.has_type(ap.ActivityType.ANNOUNCE):
             try:
                 activity.get_object()
                 tag_stream = True
+                if activity.get_object_id().startswith(BASE_URL):
+                    should_keep = True
             except (NotAnActivityError, BadActivityError):
                 app.logger.exception(f"failed to get announce object for {activity!r}")
                 # Most likely on OStatus notice
@@ -2657,12 +2724,21 @@ def task_process_new_activity():
                 # The announced activity is deleted/gone, drop it
                 should_delete = True
 
+        elif activity.has_type(ap.ActivityType.FOLLOW):
+            # FIXME(tsileo): ensure it's a follow where the server is the object
+            should_keep = True
+
         elif activity.has_type(ap.ActivityType.CREATE):
             note = activity.get_object()
             # Make the note part of the stream if it's not a reply, or if it's a local reply
             if not note.inReplyTo or note.inReplyTo.startswith(ID):
                 tag_stream = True
 
+            if (note.inReplyTo and note.inReplyTo.startswith(ID)) or note.has_mention(
+                ID
+            ):
+                should_keep = True
+
             if note.inReplyTo:
                 try:
                     reply = ap.fetch_remote_activity(note.inReplyTo)
@@ -2672,6 +2748,7 @@ def task_process_new_activity():
                         # The reply is public "local reply", forward the reply (i.e. the original activity) to the
                         # original recipients
                         should_forward = True
+                        should_keep = True
                 except NotAnActivityError:
                     # Most likely a reply to an OStatus notce
                     should_delete = True
@@ -2699,7 +2776,9 @@ def task_process_new_activity():
                 should_forward = True
 
         elif activity.has_type(ap.ActivityType.LIKE):
-            if not activity.get_object_id().startswith(BASE_URL):
+            if activity.get_object_id().startswith(BASE_URL):
+                should_keep = True
+            else:
                 # We only want to keep a like if it's a like for a local activity
                 # (Pleroma relay the likes it received, we don't want to store them)
                 should_delete = True
@@ -2716,6 +2795,7 @@ def task_process_new_activity():
             {"remote_id": activity.id},
             {
                 "$set": {
+                    "meta.keep": should_keep,
                     "meta.stream": tag_stream,
                     "meta.forwarded": should_forward,
                     "meta.deleted": should_delete,
diff --git a/poussetaches.py b/poussetaches.py
index 7909416..28314f3 100644
--- a/poussetaches.py
+++ b/poussetaches.py
@@ -60,7 +60,9 @@ class PousseTaches:
         print(f"envelope={envelope!r}")
         payload = json.loads(base64.b64decode(envelope["payload"]))
 
-        return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload)  # type: ignore
+        return Task(
+            req_id=envelope["req_id"], tries=envelope["tries"], payload=payload
+        )  # type: ignore
 
     @staticmethod
     def _expand_task(t: Dict[str, Any]) -> None:
@@ -86,16 +88,18 @@ class PousseTaches:
         dat = resp.json()
         for t in dat["tasks"]:
             self._expand_task(t)
-            out.append(GetTask(
-                task_id=t["id"],
-                payload=t["payload"],
-                expected=t["expected"],
-                tries=t["tries"],
-                url=t["url"],
-                last_error_status_code=t["last_error_status_code"],
-                last_error_body=t["last_error_body"],
-                next_run=t["next_run"],
-            ))
+            out.append(
+                GetTask(
+                    task_id=t["id"],
+                    payload=t["payload"],
+                    expected=t["expected"],
+                    tries=t["tries"],
+                    url=t["url"],
+                    last_error_status_code=t["last_error_status_code"],
+                    last_error_body=t["last_error_body"],
+                    next_run=t["next_run"],
+                )
+            )
 
         return out