More work for cleaning up old activities

This commit is contained in:
Thomas Sileo 2019-04-08 16:41:09 +02:00
parent 363dbf4b6a
commit 27622813ec
5 changed files with 222 additions and 147 deletions

262
app.py
View file

@ -599,23 +599,28 @@ def admin_login():
if request.method == "POST": if request.method == "POST":
csrf.protect() csrf.protect()
pwd = request.form.get("pass") pwd = request.form.get("pass")
if pwd and verify_pass(pwd): if devices:
if devices: resp = json.loads(request.form.get("resp"))
resp = json.loads(request.form.get("resp")) try:
print(resp) u2f.complete_authentication(session["challenge"], resp)
try: except ValueError as exc:
u2f.complete_authentication(session["challenge"], resp) print("failed", exc)
except ValueError as exc: abort(403)
print("failed", exc) return
abort(401) finally:
return session["challenge"] = None
finally:
session["challenge"] = None
session["logged_in"] = True session["logged_in"] = True
return redirect( return redirect(
request.args.get("redirect") or url_for("admin_notifications") request.args.get("redirect") or url_for("admin_notifications")
) )
elif pwd and verify_pass(pwd):
session["logged_in"] = True
return redirect(
request.args.get("redirect") or url_for("admin_notifications")
)
elif pwd:
abort(403)
else: else:
abort(401) abort(401)
@ -681,7 +686,8 @@ def u2f_register():
device, device_cert = u2f.complete_registration(session["challenge"], resp) device, device_cert = u2f.complete_registration(session["challenge"], resp)
session["challenge"] = None session["challenge"] = None
DB.u2f.insert_one({"device": device, "cert": device_cert}) DB.u2f.insert_one({"device": device, "cert": device_cert})
return "" session["logged_in"] = False
return redirect("/login")
####### #######
@ -693,133 +699,6 @@ def drop_cache():
return "Done" return "Done"
@app.route("/migration1_step1")
@login_required
def tmp_migrate():
for activity in DB.outbox.find():
activity["box"] = Box.OUTBOX.value
DB.activities.insert_one(activity)
for activity in DB.inbox.find():
activity["box"] = Box.INBOX.value
DB.activities.insert_one(activity)
for activity in DB.replies.find():
activity["box"] = Box.REPLIES.value
DB.activities.insert_one(activity)
return "Done"
@app.route("/migration1_step2")
@login_required
def tmp_migrate2():
# Remove buggy OStatus announce
DB.activities.remove(
{"activity.object": {"$regex": f"^tag:"}, "type": ActivityType.ANNOUNCE.value}
)
# Cache the object
for activity in DB.activities.find():
if (
activity["box"] == Box.OUTBOX.value
and activity["type"] == ActivityType.LIKE.value
):
like = ap.parse_activity(activity["activity"])
obj = like.get_object()
DB.activities.update_one(
{"remote_id": like.id},
{"$set": {"meta.object": obj.to_dict(embed=True)}},
)
elif activity["type"] == ActivityType.ANNOUNCE.value:
announce = ap.parse_activity(activity["activity"])
obj = announce.get_object()
DB.activities.update_one(
{"remote_id": announce.id},
{"$set": {"meta.object": obj.to_dict(embed=True)}},
)
return "Done"
@app.route("/migration2")
@login_required
def tmp_migrate3():
for activity in DB.activities.find():
try:
activity = ap.parse_activity(activity["activity"])
actor = activity.get_actor()
if actor.icon:
MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
if activity.type == ActivityType.CREATE.value:
for attachment in activity.get_object()._data.get("attachment", []):
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
except Exception:
app.logger.exception("failed")
return "Done"
@app.route("/migration3")
@login_required
def tmp_migrate4():
for activity in DB.activities.find(
{"box": Box.OUTBOX.value, "type": ActivityType.UNDO.value}
):
try:
activity = ap.parse_activity(activity["activity"])
if activity.get_object().type == ActivityType.FOLLOW.value:
DB.activities.update_one(
{"remote_id": activity.get_object().id},
{"$set": {"meta.undo": True}},
)
print(activity.get_object().to_dict())
except Exception:
app.logger.exception("failed")
for activity in DB.activities.find(
{"box": Box.INBOX.value, "type": ActivityType.UNDO.value}
):
try:
activity = ap.parse_activity(activity["activity"])
if activity.get_object().type == ActivityType.FOLLOW.value:
DB.activities.update_one(
{"remote_id": activity.get_object().id},
{"$set": {"meta.undo": True}},
)
print(activity.get_object().to_dict())
except Exception:
app.logger.exception("failed")
return "Done"
@app.route("/migration4")
@login_required
def tmp_migrate5():
for activity in DB.activities.find():
Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False)
return "Done"
@app.route("/migration5")
@login_required
def tmp_migrate6():
for activity in DB.activities.find():
# tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False)
try:
a = ap.parse_activity(activity["activity"])
if a.has_type([ActivityType.LIKE, ActivityType.FOLLOW]):
DB.activities.update_one(
{"remote_id": a.id},
{
"$set": {
"meta.object_actor": activitypub._actor_to_meta(
a.get_object().get_actor()
)
}
},
)
except Exception:
app.logger.exception(f"processing {activity} failed")
return "Done"
def paginated_query(db, q, limit=25, sort_key="_id"): def paginated_query(db, q, limit=25, sort_key="_id"):
older_than = newer_than = None older_than = newer_than = None
query_sort = -1 query_sort = -1
@ -1422,6 +1301,8 @@ def admin():
def admin_cleanup(): def admin_cleanup():
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d") d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
# (We keep Follow and Accept forever)
# Announce and Like cleanup # Announce and Like cleanup
for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]: for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
# Migrate old (before meta.keep activities on the fly) # Migrate old (before meta.keep activities on the fly)
@ -1475,6 +1356,97 @@ def admin_cleanup():
} }
) )
# Create cleanup (more complicated)
# The one that mention our actor
DB.activities.update_many(
{
"box": Box.INBOX.value,
"meta.keep": {"$exists": False},
"activity.object.tag.href": {"$regex": f"^{BASE_URL}"},
},
{"$set": {"meta.keep": True}},
)
DB.activities.update_many(
{
"box": Box.REPLIES.value,
"meta.keep": {"$exists": False},
"activity.tag.href": {"$regex": f"^{BASE_URL}"},
},
{"$set": {"meta.keep": True}},
)
# The replies of the outbox
DB.activities.update_many(
{"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}},
{"$set": {"meta.keep": True}},
)
# Track all the threads we participated
keep_threads = []
for data in DB.activities.find(
{
"box": Box.OUTBOX.value,
"type": ActivityType.CREATE.value,
"meta.thread_root_parent": {"$exists": True},
}
):
keep_threads.append(data["meta"]["thread_root_parent"])
for root_parent in set(keep_threads):
DB.activities.update_many(
{"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}}
)
DB.activities.update_many(
{
"box": {"$in": [Box.REPLIES.value, Box.INBOX.value]},
"meta.keep": {"$exists": False},
},
{"$set": {"meta.keep": False}},
)
return "OK"
@app.route("/admin/cleanup2", methods=["GET"])
@login_required
def admin_cleanup2():
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
# Go over the old Create activities
for data in DB.activities.find(
{
"box": Box.INBOX.value,
"type": ActivityType.CREATE.value,
"meta.keep": False,
"activity.published": {"$lt": d},
}
):
# Delete the cached attachment/
for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}):
MEDIA_CACHE.fs.delete(grid_item._id)
# Delete the Create activities that no longer have cached attachments
DB.activities.delete_many(
{
"box": Box.INBOX.value,
"type": ActivityType.CREATE.value,
"meta.keep": False,
"activity.published": {"$lt": d},
}
)
# Delete old replies we don't care about
DB.activities.delete_many(
{"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}}
)
# Remove all the attachments no tied to a remote_id (post celery migration)
for grid_item in MEDIA_CACHE.fs.find(
{"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}}
):
MEDIA_CACHE.fs.delete(grid_item._id)
# TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task
return "OK" return "OK"
@ -1482,7 +1454,10 @@ def admin_cleanup():
@login_required @login_required
def admin_tasks(): def admin_tasks():
return render_template( return render_template(
"admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting() "admin_tasks.html",
dead=p.get_dead(),
waiting=p.get_waiting(),
cron=[], # cron=p.get_cron(),
) )
@ -2385,7 +2360,7 @@ def task_fetch_og_meta():
for og in og_metadata: for og in og_metadata:
if not og.get("image"): if not og.get("image"):
continue continue
MEDIA_CACHE.cache_og_image(og["image"]) MEDIA_CACHE.cache_og_image2(og["image"], iri)
app.logger.debug(f"OG metadata {og_metadata!r}") app.logger.debug(f"OG metadata {og_metadata!r}")
DB.activities.update_one( DB.activities.update_one(
@ -2613,7 +2588,7 @@ def task_cache_attachments():
or attachment.get("type") == ap.ActivityType.IMAGE.value or attachment.get("type") == ap.ActivityType.IMAGE.value
): ):
try: try:
MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT) MEDIA_CACHE.cache_attachment2(attachment["url"], iri)
except ValueError: except ValueError:
app.logger.exception(f"failed to cache {attachment}") app.logger.exception(f"failed to cache {attachment}")
@ -2710,6 +2685,7 @@ def task_process_new_activity():
tag_stream = False tag_stream = False
if activity.has_type(ap.ActivityType.ANNOUNCE): if activity.has_type(ap.ActivityType.ANNOUNCE):
# FIXME(tsileo): Ensure it's follower and store into a "dead activities" DB
try: try:
activity.get_object() activity.get_object()
tag_stream = True tag_stream = True

View file

@ -24,6 +24,7 @@ class Task:
class GetTask: class GetTask:
payload: Any payload: Any
expected: int expected: int
# schedule: str
task_id: str task_id: str
next_run: datetime next_run: datetime
tries: int tries: int
@ -37,14 +38,21 @@ class PousseTaches:
self.api_url = api_url self.api_url = api_url
self.base_url = base_url self.base_url = base_url
def push(self, payload: Any, path: str, expected=200) -> str: def push(
self, payload: Any, path: str, expected: int = 200, schedule: str = ""
) -> str:
# Encode our payload # Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode() p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it # Queue/push it
resp = requests.post( resp = requests.post(
self.api_url, self.api_url,
json={"url": self.base_url + path, "payload": p, "expected": expected}, json={
"url": self.base_url + path,
"payload": p,
"expected": expected,
"schedule": schedule,
},
) )
resp.raise_for_status() resp.raise_for_status()
@ -93,6 +101,7 @@ class PousseTaches:
task_id=t["id"], task_id=t["id"],
payload=t["payload"], payload=t["payload"],
expected=t["expected"], expected=t["expected"],
# shedule=t["schedule"],
tries=t["tries"], tries=t["tries"],
url=t["url"], url=t["url"],
last_error_status_code=t["last_error_status_code"], last_error_status_code=t["last_error_status_code"],
@ -103,6 +112,9 @@ class PousseTaches:
return out return out
def get_cron(self) -> List[GetTask]:
return self._get("cron")
def get_success(self) -> List[GetTask]: def get_success(self) -> List[GetTask]:
return self._get("success") return self._get("success")

View file

@ -6,6 +6,33 @@
{% include "header.html" %} {% include "header.html" %}
<div style="margin-top:50px;"> <div style="margin-top:50px;">
<h3>Cron</h3>
<table class="pure-table">
<thead>
<tr>
<th>#</th>
<th>URL</th>
<th>Payload</th>
<th>Schedule</th>
<th>Next run</th>
<th>Response</th>
</tr>
</thead>
<tbody>
{% for task in dead %}
<tr>
<td>{{ task.task_id }}</td>
<td>{{ task.url }} ({{ task.expected }})</td>
<td>{{ task.payload }}</td>
<td>{{ task.schedule }}</td>
<td>{{ task.next_run }}</td>
<td>Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }})</td>
</tr>
{% endfor %}
</tbody>
</table>
<h3>Dead</h3> <h3>Dead</h3>
<table class="pure-table"> <table class="pure-table">
<thead> <thead>
@ -22,7 +49,7 @@
{% for task in dead %} {% for task in dead %}
<tr> <tr>
<td>{{ task.task_id }}</td> <td>{{ task.task_id }}</td>
<td>{{ task.url }} ({{ task.expected }}</td> <td>{{ task.url }} ({{ task.expected }})</td>
<td>{{ task.payload }}</td> <td>{{ task.payload }}</td>
<td>{{ task.next_run }}</td> <td>{{ task.next_run }}</td>
<td>Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }})</td> <td>Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }})</td>

View file

@ -21,7 +21,7 @@ display:inline;
<input type="password" name="pass" placeholder="password"> <input type="password" name="pass" placeholder="password">
{% if u2f_enabled %} {% if u2f_enabled %}
<input type="hidden" name="resp" id="sig-payload" value=""> <input type="hidden" name="resp" id="sig-payload" value="">
<input type="submit" value="waiting for u2f" disabled> <input type="submit" value="waiting for u2f or login with password" disabled>
{% else %} {% else %}
<input type="submit" value="login"> <input type="submit" value="login">
{% endif %} {% endif %}

View file

@ -60,6 +60,25 @@ class MediaCache(object):
kind=Kind.OG_IMAGE.value, kind=Kind.OG_IMAGE.value,
) )
def cache_og_image2(self, url: str, remote_id: str) -> None:
if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}):
return
i = load(url, self.user_agent)
# Save the original attachment (gzipped)
i.thumbnail((100, 100))
with BytesIO() as buf:
with GzipFile(mode="wb", fileobj=buf) as f1:
i.save(f1, format=i.format)
buf.seek(0)
self.fs.put(
buf,
url=url,
size=100,
content_type=i.get_format_mimetype(),
kind=Kind.OG_IMAGE.value,
remote_id=remote_id,
)
def cache_attachment(self, url: str) -> None: def cache_attachment(self, url: str) -> None:
if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}): if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}):
return return
@ -98,6 +117,46 @@ class MediaCache(object):
) )
return return
def cache_attachment2(self, url: str, remote_id: str) -> None:
if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}):
return
if (
url.endswith(".png")
or url.endswith(".jpg")
or url.endswith(".jpeg")
or url.endswith(".gif")
):
i = load(url, self.user_agent)
# Save the original attachment (gzipped)
with BytesIO() as buf:
f1 = GzipFile(mode="wb", fileobj=buf)
i.save(f1, format=i.format)
f1.close()
buf.seek(0)
self.fs.put(
buf,
url=url,
size=None,
content_type=i.get_format_mimetype(),
kind=Kind.ATTACHMENT.value,
remote_id=remote_id,
)
# Save a thumbnail (gzipped)
i.thumbnail((720, 720))
with BytesIO() as buf:
with GzipFile(mode="wb", fileobj=buf) as f1:
i.save(f1, format=i.format)
buf.seek(0)
self.fs.put(
buf,
url=url,
size=720,
content_type=i.get_format_mimetype(),
kind=Kind.ATTACHMENT.value,
remote_id=remote_id,
)
return
# The attachment is not an image, download and save it anyway # The attachment is not an image, download and save it anyway
with requests.get( with requests.get(
url, stream=True, headers={"User-Agent": self.user_agent} url, stream=True, headers={"User-Agent": self.user_agent}
@ -115,6 +174,7 @@ class MediaCache(object):
size=None, size=None,
content_type=mimetypes.guess_type(url)[0], content_type=mimetypes.guess_type(url)[0],
kind=Kind.ATTACHMENT.value, kind=Kind.ATTACHMENT.value,
remote_id=remote_id,
) )
def cache_actor_icon(self, url: str) -> None: def cache_actor_icon(self, url: str) -> None: