forked from forks/microblog.pub
More GC work
This commit is contained in:
parent
ace2575cf9
commit
cbd7fc6446
2 changed files with 81 additions and 4 deletions
|
@ -1,16 +1,19 @@
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
from time import perf_counter
|
||||||
|
from typing import Any
|
||||||
|
from typing import Dict
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from little_boxes import activitypub as ap
|
from little_boxes import activitypub as ap
|
||||||
|
|
||||||
import activitypub
|
import activitypub
|
||||||
from activitypub import Box
|
from activitypub import Box
|
||||||
|
from config import DAYS_TO_KEEP
|
||||||
from config import ID
|
from config import ID
|
||||||
from config import ME
|
from config import ME
|
||||||
from config import MEDIA_CACHE
|
from config import MEDIA_CACHE
|
||||||
from config import DAYS_TO_KEEP
|
|
||||||
from utils.migrations import DB
|
from utils.migrations import DB
|
||||||
|
|
||||||
back = activitypub.MicroblogPubBackend()
|
back = activitypub.MicroblogPubBackend()
|
||||||
|
@ -45,20 +48,29 @@ def threads_of_interest() -> List[str]:
|
||||||
return list(out)
|
return list(out)
|
||||||
|
|
||||||
|
|
||||||
def perform() -> None:
|
def _keep(data: Dict[str, Any]):
|
||||||
|
DB.activities.update_one({"_id": data["_id"]}, {"$set": {"meta.gc_keep": True}})
|
||||||
|
|
||||||
|
|
||||||
|
def perform() -> None: # noqa: C901
|
||||||
|
start = perf_counter()
|
||||||
d = (datetime.utcnow() - timedelta(days=DAYS_TO_KEEP)).strftime("%Y-%m-%d")
|
d = (datetime.utcnow() - timedelta(days=DAYS_TO_KEEP)).strftime("%Y-%m-%d")
|
||||||
toi = threads_of_interest()
|
toi = threads_of_interest()
|
||||||
logger.info(f"thread_of_interest={toi!r}")
|
logger.info(f"thread_of_interest={toi!r}")
|
||||||
|
|
||||||
|
create_deleted = 0
|
||||||
|
create_count = 0
|
||||||
# Go over the old Create activities
|
# Go over the old Create activities
|
||||||
for data in DB.activities.find(
|
for data in DB.activities.find(
|
||||||
{
|
{
|
||||||
"box": Box.INBOX.value,
|
"box": Box.INBOX.value,
|
||||||
"type": ap.ActivityType.CREATE.value,
|
"type": ap.ActivityType.CREATE.value,
|
||||||
"activity.published": {"$lt": d},
|
"activity.published": {"$lt": d},
|
||||||
|
"meta.gc_keep": {"$exists": False},
|
||||||
}
|
}
|
||||||
):
|
).limit(500):
|
||||||
try:
|
try:
|
||||||
|
create_count += 1
|
||||||
remote_id = data["remote_id"]
|
remote_id = data["remote_id"]
|
||||||
meta = data["meta"]
|
meta = data["meta"]
|
||||||
activity = ap.parse_activity(data["activity"])
|
activity = ap.parse_activity(data["activity"])
|
||||||
|
@ -66,6 +78,7 @@ def perform() -> None:
|
||||||
|
|
||||||
# This activity has been bookmarked, keep it
|
# This activity has been bookmarked, keep it
|
||||||
if meta.get("bookmarked"):
|
if meta.get("bookmarked"):
|
||||||
|
_keep(data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Inspect the object
|
# Inspect the object
|
||||||
|
@ -73,21 +86,25 @@ def perform() -> None:
|
||||||
|
|
||||||
# This activity mentions the server actor, keep it
|
# This activity mentions the server actor, keep it
|
||||||
if obj.has_mention(ID):
|
if obj.has_mention(ID):
|
||||||
|
_keep(data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# This activity is a direct reply of one the server actor activity, keep it
|
# This activity is a direct reply of one the server actor activity, keep it
|
||||||
in_reply_to = obj.get_in_reply_to()
|
in_reply_to = obj.get_in_reply_to()
|
||||||
if in_reply_to and in_reply_to.startswith(ID):
|
if in_reply_to and in_reply_to.startswith(ID):
|
||||||
|
_keep(data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# This activity is part of a thread we want to keep, keep it
|
# This activity is part of a thread we want to keep, keep it
|
||||||
if in_reply_to and meta.get("thread_root_parent"):
|
if in_reply_to and meta.get("thread_root_parent"):
|
||||||
thread_root_parent = meta["thread_root_parent"]
|
thread_root_parent = meta["thread_root_parent"]
|
||||||
if thread_root_parent.startswith(ID) or thread_root_parent in toi:
|
if thread_root_parent.startswith(ID) or thread_root_parent in toi:
|
||||||
|
_keep(data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# This activity was boosted or liked, keep it
|
# This activity was boosted or liked, keep it
|
||||||
if meta.get("boosted") or meta.get("liked"):
|
if meta.get("boosted") or meta.get("liked"):
|
||||||
|
_keep(data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# TODO(tsileo): remove after tests
|
# TODO(tsileo): remove after tests
|
||||||
|
@ -95,6 +112,7 @@ def perform() -> None:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"{activity!r} would not have been deleted, skipping for now"
|
f"{activity!r} would not have been deleted, skipping for now"
|
||||||
)
|
)
|
||||||
|
_keep(data)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Delete the cached attachment
|
# Delete the cached attachment
|
||||||
|
@ -103,5 +121,62 @@ def perform() -> None:
|
||||||
|
|
||||||
# Delete the activity
|
# Delete the activity
|
||||||
DB.activities.delete_one({"_id": data["_id"]})
|
DB.activities.delete_one({"_id": data["_id"]})
|
||||||
|
create_deleted += 1
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(f"failed to process {data!r}")
|
logger.exception(f"failed to process {data!r}")
|
||||||
|
|
||||||
|
after_gc_create = perf_counter()
|
||||||
|
time_to_gc_create = start - after_gc_create
|
||||||
|
logger.info(
|
||||||
|
f"{time_to_gc_create:.2f} seconds to analyze {create_count} Create, {create_deleted} deleted"
|
||||||
|
)
|
||||||
|
|
||||||
|
announce_count = 0
|
||||||
|
announce_deleted = 0
|
||||||
|
# Go over the old Create activities
|
||||||
|
for data in DB.activities.find(
|
||||||
|
{
|
||||||
|
"box": Box.INBOX.value,
|
||||||
|
"type": ap.ActivityType.ANNOUNCE.value,
|
||||||
|
"activity.published": {"$lt": d},
|
||||||
|
"meta.gc_keep": {"$exists": False},
|
||||||
|
}
|
||||||
|
).limit(500):
|
||||||
|
try:
|
||||||
|
announce_count += 1
|
||||||
|
remote_id = data["remote_id"]
|
||||||
|
meta = data["meta"]
|
||||||
|
activity = ap.parse_activity(data["activity"])
|
||||||
|
logger.info(f"activity={activity!r}")
|
||||||
|
|
||||||
|
# This activity has been bookmarked, keep it
|
||||||
|
if meta.get("bookmarked"):
|
||||||
|
_keep(data)
|
||||||
|
continue
|
||||||
|
|
||||||
|
object_id = activity.get_object_id()
|
||||||
|
|
||||||
|
# This announce is for a local activity (i.e. from the outbox), keep it
|
||||||
|
if object_id.startswith(ID):
|
||||||
|
_keep(data)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for grid_item in MEDIA_CACHE.fs.find({"remote_id": remote_id}):
|
||||||
|
MEDIA_CACHE.fs.delete(grid_item._id)
|
||||||
|
|
||||||
|
# TODO(tsileo): here for legacy reason, this needs to be removed at some point
|
||||||
|
for grid_item in MEDIA_CACHE.fs.find({"remote_id": object_id}):
|
||||||
|
MEDIA_CACHE.fs.delete(grid_item._id)
|
||||||
|
|
||||||
|
# Delete the activity
|
||||||
|
DB.activities.delete_one({"_id": data["_id"]})
|
||||||
|
|
||||||
|
announce_deleted += 1
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f"failed to process {data!r}")
|
||||||
|
|
||||||
|
after_gc_announce = perf_counter()
|
||||||
|
time_to_gc_announce = after_gc_create - after_gc_announce
|
||||||
|
logger.info(
|
||||||
|
f"{time_to_gc_announce:.2f} seconds to analyze {announce_count} Announce, {announce_deleted} deleted"
|
||||||
|
)
|
4
app.py
4
app.py
|
@ -58,6 +58,7 @@ from requests.exceptions import HTTPError
|
||||||
from u2flib_server import u2f
|
from u2flib_server import u2f
|
||||||
from werkzeug.utils import secure_filename
|
from werkzeug.utils import secure_filename
|
||||||
|
|
||||||
|
import activity_gc
|
||||||
import activitypub
|
import activitypub
|
||||||
import config
|
import config
|
||||||
from activitypub import Box
|
from activitypub import Box
|
||||||
|
@ -116,6 +117,7 @@ root_logger = logging.getLogger()
|
||||||
if os.getenv("FLASK_DEBUG"):
|
if os.getenv("FLASK_DEBUG"):
|
||||||
logger.setLevel(logging.DEBUG)
|
logger.setLevel(logging.DEBUG)
|
||||||
root_logger.setLevel(logging.DEBUG)
|
root_logger.setLevel(logging.DEBUG)
|
||||||
|
root_logger.handlers = app.logger.handlers
|
||||||
else:
|
else:
|
||||||
gunicorn_logger = logging.getLogger("gunicorn.error")
|
gunicorn_logger = logging.getLogger("gunicorn.error")
|
||||||
root_logger.handlers = gunicorn_logger.handlers
|
root_logger.handlers = gunicorn_logger.handlers
|
||||||
|
@ -3276,7 +3278,7 @@ def task_update_question():
|
||||||
def task_cleanup():
|
def task_cleanup():
|
||||||
task = p.parse(request)
|
task = p.parse(request)
|
||||||
app.logger.info(f"task={task!r}")
|
app.logger.info(f"task={task!r}")
|
||||||
# p.push({}, "/task/cleanup_part_1")
|
activity_gc.perform()
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue