diff --git a/app.py b/app.py
index 507c767..7d00994 100644
--- a/app.py
+++ b/app.py
@@ -599,23 +599,28 @@ def admin_login():
if request.method == "POST":
csrf.protect()
pwd = request.form.get("pass")
- if pwd and verify_pass(pwd):
- if devices:
- resp = json.loads(request.form.get("resp"))
- print(resp)
- try:
- u2f.complete_authentication(session["challenge"], resp)
- except ValueError as exc:
- print("failed", exc)
- abort(401)
- return
- finally:
- session["challenge"] = None
+ if devices:
+ resp = json.loads(request.form.get("resp"))
+ try:
+ u2f.complete_authentication(session["challenge"], resp)
+ except ValueError as exc:
+ print("failed", exc)
+ abort(403)
+ return
+ finally:
+ session["challenge"] = None
session["logged_in"] = True
return redirect(
request.args.get("redirect") or url_for("admin_notifications")
)
+ elif pwd and verify_pass(pwd):
+ session["logged_in"] = True
+ return redirect(
+ request.args.get("redirect") or url_for("admin_notifications")
+ )
+ elif pwd:
+ abort(403)
else:
abort(401)
@@ -681,7 +686,8 @@ def u2f_register():
device, device_cert = u2f.complete_registration(session["challenge"], resp)
session["challenge"] = None
DB.u2f.insert_one({"device": device, "cert": device_cert})
- return ""
+ session["logged_in"] = False
+ return redirect("/login")
#######
@@ -693,133 +699,6 @@ def drop_cache():
return "Done"
-@app.route("/migration1_step1")
-@login_required
-def tmp_migrate():
- for activity in DB.outbox.find():
- activity["box"] = Box.OUTBOX.value
- DB.activities.insert_one(activity)
- for activity in DB.inbox.find():
- activity["box"] = Box.INBOX.value
- DB.activities.insert_one(activity)
- for activity in DB.replies.find():
- activity["box"] = Box.REPLIES.value
- DB.activities.insert_one(activity)
- return "Done"
-
-
-@app.route("/migration1_step2")
-@login_required
-def tmp_migrate2():
- # Remove buggy OStatus announce
- DB.activities.remove(
- {"activity.object": {"$regex": f"^tag:"}, "type": ActivityType.ANNOUNCE.value}
- )
- # Cache the object
- for activity in DB.activities.find():
- if (
- activity["box"] == Box.OUTBOX.value
- and activity["type"] == ActivityType.LIKE.value
- ):
- like = ap.parse_activity(activity["activity"])
- obj = like.get_object()
- DB.activities.update_one(
- {"remote_id": like.id},
- {"$set": {"meta.object": obj.to_dict(embed=True)}},
- )
- elif activity["type"] == ActivityType.ANNOUNCE.value:
- announce = ap.parse_activity(activity["activity"])
- obj = announce.get_object()
- DB.activities.update_one(
- {"remote_id": announce.id},
- {"$set": {"meta.object": obj.to_dict(embed=True)}},
- )
- return "Done"
-
-
-@app.route("/migration2")
-@login_required
-def tmp_migrate3():
- for activity in DB.activities.find():
- try:
- activity = ap.parse_activity(activity["activity"])
- actor = activity.get_actor()
- if actor.icon:
- MEDIA_CACHE.cache(actor.icon["url"], Kind.ACTOR_ICON)
- if activity.type == ActivityType.CREATE.value:
- for attachment in activity.get_object()._data.get("attachment", []):
- MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
- except Exception:
- app.logger.exception("failed")
- return "Done"
-
-
-@app.route("/migration3")
-@login_required
-def tmp_migrate4():
- for activity in DB.activities.find(
- {"box": Box.OUTBOX.value, "type": ActivityType.UNDO.value}
- ):
- try:
- activity = ap.parse_activity(activity["activity"])
- if activity.get_object().type == ActivityType.FOLLOW.value:
- DB.activities.update_one(
- {"remote_id": activity.get_object().id},
- {"$set": {"meta.undo": True}},
- )
- print(activity.get_object().to_dict())
- except Exception:
- app.logger.exception("failed")
- for activity in DB.activities.find(
- {"box": Box.INBOX.value, "type": ActivityType.UNDO.value}
- ):
- try:
- activity = ap.parse_activity(activity["activity"])
- if activity.get_object().type == ActivityType.FOLLOW.value:
- DB.activities.update_one(
- {"remote_id": activity.get_object().id},
- {"$set": {"meta.undo": True}},
- )
- print(activity.get_object().to_dict())
- except Exception:
- app.logger.exception("failed")
- return "Done"
-
-
-@app.route("/migration4")
-@login_required
-def tmp_migrate5():
- for activity in DB.activities.find():
- Tasks.cache_actor(activity["remote_id"], also_cache_attachments=False)
-
- return "Done"
-
-
-@app.route("/migration5")
-@login_required
-def tmp_migrate6():
- for activity in DB.activities.find():
- # tasks.cache_actor.delay(activity["remote_id"], also_cache_attachments=False)
-
- try:
- a = ap.parse_activity(activity["activity"])
- if a.has_type([ActivityType.LIKE, ActivityType.FOLLOW]):
- DB.activities.update_one(
- {"remote_id": a.id},
- {
- "$set": {
- "meta.object_actor": activitypub._actor_to_meta(
- a.get_object().get_actor()
- )
- }
- },
- )
- except Exception:
- app.logger.exception(f"processing {activity} failed")
-
- return "Done"
-
-
def paginated_query(db, q, limit=25, sort_key="_id"):
older_than = newer_than = None
query_sort = -1
@@ -1422,6 +1301,8 @@ def admin():
def admin_cleanup():
d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
+ # (We keep Follow and Accept forever)
+
# Announce and Like cleanup
for ap_type in [ActivityType.ANNOUNCE, ActivityType.LIKE]:
# Migrate old (before meta.keep activities on the fly)
@@ -1475,6 +1356,97 @@ def admin_cleanup():
}
)
+ # Create cleanup (more complicated)
+ # The one that mention our actor
+ DB.activities.update_many(
+ {
+ "box": Box.INBOX.value,
+ "meta.keep": {"$exists": False},
+ "activity.object.tag.href": {"$regex": f"^{BASE_URL}"},
+ },
+ {"$set": {"meta.keep": True}},
+ )
+ DB.activities.update_many(
+ {
+ "box": Box.REPLIES.value,
+ "meta.keep": {"$exists": False},
+ "activity.tag.href": {"$regex": f"^{BASE_URL}"},
+ },
+ {"$set": {"meta.keep": True}},
+ )
+
+ # The replies of the outbox
+ DB.activities.update_many(
+ {"meta.thread_root_parent": {"$regex": f"^{BASE_URL}"}},
+ {"$set": {"meta.keep": True}},
+ )
+ # Track all the threads we participated
+ keep_threads = []
+ for data in DB.activities.find(
+ {
+ "box": Box.OUTBOX.value,
+ "type": ActivityType.CREATE.value,
+ "meta.thread_root_parent": {"$exists": True},
+ }
+ ):
+ keep_threads.append(data["meta"]["thread_root_parent"])
+
+ for root_parent in set(keep_threads):
+ DB.activities.update_many(
+ {"meta.thread_root_parent": root_parent}, {"$set": {"meta.keep": True}}
+ )
+
+ DB.activities.update_many(
+ {
+ "box": {"$in": [Box.REPLIES.value, Box.INBOX.value]},
+ "meta.keep": {"$exists": False},
+ },
+ {"$set": {"meta.keep": False}},
+ )
+ return "OK"
+
+
+@app.route("/admin/cleanup2", methods=["GET"])
+@login_required
+def admin_cleanup2():
+ d = (datetime.utcnow() - timedelta(days=45)).strftime("%Y-%m-%d")
+
+ # Go over the old Create activities
+ for data in DB.activities.find(
+ {
+ "box": Box.INBOX.value,
+ "type": ActivityType.CREATE.value,
+ "meta.keep": False,
+ "activity.published": {"$lt": d},
+ }
+ ):
+ # Delete the cached attachment/
+ for grid_item in MEDIA_CACHE.fs.find({"remote_id": data["remote_id"]}):
+ MEDIA_CACHE.fs.delete(grid_item._id)
+
+ # Delete the Create activities that no longer have cached attachments
+ DB.activities.delete_many(
+ {
+ "box": Box.INBOX.value,
+ "type": ActivityType.CREATE.value,
+ "meta.keep": False,
+ "activity.published": {"$lt": d},
+ }
+ )
+
+ # Delete old replies we don't care about
+ DB.activities.delete_many(
+ {"box": Box.REPLIES.value, "meta.keep": False, "activity.published": {"$lt": d}}
+ )
+
+ # Remove all the attachments no tied to a remote_id (post celery migration)
+ for grid_item in MEDIA_CACHE.fs.find(
+ {"kind": {"$in": ["og", "attachment"]}, "remote_id": {"$exists": False}}
+ ):
+ MEDIA_CACHE.fs.delete(grid_item._id)
+
+ # TODO(tsileo): iterator over "actor_icon" and look for unused one in a separate task
+
return "OK"
@@ -1482,7 +1454,10 @@ def admin_cleanup():
@login_required
def admin_tasks():
return render_template(
- "admin_tasks.html", dead=p.get_dead(), waiting=p.get_waiting()
+ "admin_tasks.html",
+ dead=p.get_dead(),
+ waiting=p.get_waiting(),
+ cron=[], # cron=p.get_cron(),
)
@@ -2385,7 +2360,7 @@ def task_fetch_og_meta():
for og in og_metadata:
if not og.get("image"):
continue
- MEDIA_CACHE.cache_og_image(og["image"])
+ MEDIA_CACHE.cache_og_image2(og["image"], iri)
app.logger.debug(f"OG metadata {og_metadata!r}")
DB.activities.update_one(
@@ -2613,7 +2588,7 @@ def task_cache_attachments():
or attachment.get("type") == ap.ActivityType.IMAGE.value
):
try:
- MEDIA_CACHE.cache(attachment["url"], Kind.ATTACHMENT)
+ MEDIA_CACHE.cache_attachment2(attachment["url"], iri)
except ValueError:
app.logger.exception(f"failed to cache {attachment}")
@@ -2710,6 +2685,7 @@ def task_process_new_activity():
tag_stream = False
if activity.has_type(ap.ActivityType.ANNOUNCE):
+ # FIXME(tsileo): Ensure it's follower and store into a "dead activities" DB
try:
activity.get_object()
tag_stream = True
diff --git a/poussetaches.py b/poussetaches.py
index 28314f3..e844dee 100644
--- a/poussetaches.py
+++ b/poussetaches.py
@@ -24,6 +24,7 @@ class Task:
class GetTask:
payload: Any
expected: int
+ # schedule: str
task_id: str
next_run: datetime
tries: int
@@ -37,14 +38,21 @@ class PousseTaches:
self.api_url = api_url
self.base_url = base_url
- def push(self, payload: Any, path: str, expected=200) -> str:
+ def push(
+ self, payload: Any, path: str, expected: int = 200, schedule: str = ""
+ ) -> str:
# Encode our payload
p = base64.b64encode(json.dumps(payload).encode()).decode()
# Queue/push it
resp = requests.post(
self.api_url,
- json={"url": self.base_url + path, "payload": p, "expected": expected},
+ json={
+ "url": self.base_url + path,
+ "payload": p,
+ "expected": expected,
+ "schedule": schedule,
+ },
)
resp.raise_for_status()
@@ -93,6 +101,7 @@ class PousseTaches:
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
+ # shedule=t["schedule"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
@@ -103,6 +112,9 @@ class PousseTaches:
return out
+ def get_cron(self) -> List[GetTask]:
+ return self._get("cron")
+
def get_success(self) -> List[GetTask]:
return self._get("success")
diff --git a/templates/admin_tasks.html b/templates/admin_tasks.html
index 38e5cbf..14f6f1c 100644
--- a/templates/admin_tasks.html
+++ b/templates/admin_tasks.html
@@ -6,6 +6,33 @@
{% include "header.html" %}
+
Cron
+
+
+
+ # |
+ URL |
+ Payload |
+ Schedule |
+ Next run |
+ Response |
+
+
+
+
+ {% for task in dead %}
+
+ {{ task.task_id }} |
+ {{ task.url }} ({{ task.expected }}) |
+ {{ task.payload }} |
+ {{ task.schedule }} |
+ {{ task.next_run }} |
+ Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }}) |
+
+ {% endfor %}
+
+
+
Dead
@@ -22,7 +49,7 @@
{% for task in dead %}
{{ task.task_id }} |
- {{ task.url }} ({{ task.expected }} |
+ {{ task.url }} ({{ task.expected }}) |
{{ task.payload }} |
{{ task.next_run }} |
Tries #{{ task.tries }}: {{ task.last_error_body }} ({{ task.last_error_status_code }}) |
diff --git a/templates/login.html b/templates/login.html
index fde73d2..0d5f368 100644
--- a/templates/login.html
+++ b/templates/login.html
@@ -21,7 +21,7 @@ display:inline;
{% if u2f_enabled %}
-
+
{% else %}
{% endif %}
diff --git a/utils/media.py b/utils/media.py
index 66ad02a..06559f7 100644
--- a/utils/media.py
+++ b/utils/media.py
@@ -60,6 +60,25 @@ class MediaCache(object):
kind=Kind.OG_IMAGE.value,
)
+ def cache_og_image2(self, url: str, remote_id: str) -> None:
+ if self.fs.find_one({"url": url, "kind": Kind.OG_IMAGE.value}):
+ return
+ i = load(url, self.user_agent)
+ # Save the original attachment (gzipped)
+ i.thumbnail((100, 100))
+ with BytesIO() as buf:
+ with GzipFile(mode="wb", fileobj=buf) as f1:
+ i.save(f1, format=i.format)
+ buf.seek(0)
+ self.fs.put(
+ buf,
+ url=url,
+ size=100,
+ content_type=i.get_format_mimetype(),
+ kind=Kind.OG_IMAGE.value,
+ remote_id=remote_id,
+ )
+
def cache_attachment(self, url: str) -> None:
if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}):
return
@@ -98,6 +117,46 @@ class MediaCache(object):
)
return
+ def cache_attachment2(self, url: str, remote_id: str) -> None:
+ if self.fs.find_one({"url": url, "kind": Kind.ATTACHMENT.value}):
+ return
+ if (
+ url.endswith(".png")
+ or url.endswith(".jpg")
+ or url.endswith(".jpeg")
+ or url.endswith(".gif")
+ ):
+ i = load(url, self.user_agent)
+ # Save the original attachment (gzipped)
+ with BytesIO() as buf:
+ f1 = GzipFile(mode="wb", fileobj=buf)
+ i.save(f1, format=i.format)
+ f1.close()
+ buf.seek(0)
+ self.fs.put(
+ buf,
+ url=url,
+ size=None,
+ content_type=i.get_format_mimetype(),
+ kind=Kind.ATTACHMENT.value,
+ remote_id=remote_id,
+ )
+ # Save a thumbnail (gzipped)
+ i.thumbnail((720, 720))
+ with BytesIO() as buf:
+ with GzipFile(mode="wb", fileobj=buf) as f1:
+ i.save(f1, format=i.format)
+ buf.seek(0)
+ self.fs.put(
+ buf,
+ url=url,
+ size=720,
+ content_type=i.get_format_mimetype(),
+ kind=Kind.ATTACHMENT.value,
+ remote_id=remote_id,
+ )
+ return
+
# The attachment is not an image, download and save it anyway
with requests.get(
url, stream=True, headers={"User-Agent": self.user_agent}
@@ -115,6 +174,7 @@ class MediaCache(object):
size=None,
content_type=mimetypes.guess_type(url)[0],
kind=Kind.ATTACHMENT.value,
+ remote_id=remote_id,
)
def cache_actor_icon(self, url: str) -> None: