From eb92169de9927fb0373d9154b041a857cebcde74 Mon Sep 17 00:00:00 2001 From: Thomas Sileo Date: Fri, 5 Apr 2019 21:36:56 +0200 Subject: [PATCH] More poussetaches integrations --- app.py | 102 +++++++++++++++++---------------------- docker-compose-tests.yml | 1 - docker-compose.yml | 8 +-- poussetaches.py | 61 +++++++++++++++++++++++ 4 files changed, 110 insertions(+), 62 deletions(-) diff --git a/app.py b/app.py index 46d3753..e320d83 100644 --- a/app.py +++ b/app.py @@ -62,7 +62,7 @@ from werkzeug.utils import secure_filename import activitypub import config -import tasks +import tasks # noqa: here just for the migration # FIXME(tsileo): remove me from activitypub import Box from activitypub import embed_collection from config import USER_AGENT @@ -2210,6 +2210,9 @@ def token_endpoint(): ) +################# +# Feeds + @app.route("/feed.json") def json_feed(): return Response( @@ -2234,22 +2237,48 @@ def rss_feed(): ) -@app.route("/task/t1") -def task_t1(): - p.push( - "https://mastodon.cloud/@iulius/101852467780804071/activity", - "/task/cache_object", - ) - return "ok" +########### +# Tasks +class Tasks: + @staticmethod + def cache_object(iri: str) -> None: + p.push(iri, "/task/cache_object") -@app.route("/task/t2", methods=["POST"]) -def task_t2(): - print(request) - print(request.headers) - task = p.parse(request) - print(task) - return "yay" + @staticmethod + def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: + p.push( + {"iri": iri, "also_cache_attachments": also_cache_attachments}, + "/task/cache_actor", + ) + + @staticmethod + def post_to_remote_inbox(payload: str, recp: str) -> None: + p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") + + @staticmethod + def forward_activity(iri: str) -> None: + p.push(iri, "/task/forward_activity") + + @staticmethod + def fetch_og_meta(iri: str) -> None: + p.push(iri, "/task/fetch_og_meta") + + @staticmethod + def process_new_activity(iri: str) -> None: + p.push(iri, "/task/process_new_activity") + + @staticmethod + def cache_attachments(iri: str) -> None: + p.push(iri, "/task/cache_attachments") + + @staticmethod + def finish_post_to_inbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_inbox") + + @staticmethod + def finish_post_to_outbox(iri: str) -> None: + p.push(iri, "/task/finish_post_to_outbox") @app.route("/task/fetch_og_meta", methods=["POST"]) @@ -2321,48 +2350,6 @@ def task_cache_object(): abort(500) return "" - -class Tasks: - @staticmethod - def cache_object(iri: str) -> None: - p.push(iri, "/task/cache_object") - - @staticmethod - def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: - p.push( - {"iri": iri, "also_cache_attachments": also_cache_attachments}, - "/task/cache_actor", - ) - - @staticmethod - def post_to_remote_inbox(payload: str, recp: str) -> None: - p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") - - @staticmethod - def forward_activity(iri: str) -> None: - p.push(iri, "/task/forward_activity") - - @staticmethod - def fetch_og_meta(iri: str) -> None: - p.push(iri, "/task/fetch_og_meta") - - @staticmethod - def process_new_activity(iri: str) -> None: - p.push(iri, "/task/process_new_activity") - - @staticmethod - def cache_attachments(iri: str) -> None: - p.push(iri, "/task/cache_attachments") - - @staticmethod - def finish_post_to_inbox(iri: str) -> None: - p.push(iri, "/task/finish_post_to_inbox") - - @staticmethod - def finish_post_to_outbox(iri: str) -> None: - p.push(iri, "/task/finish_post_to_outbox") - - @app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 def task_finish_post_to_outbox(): task = p.parse(request) @@ -2748,6 +2735,7 @@ def task_forward_activity(): @app.route("/task/post_to_remote_inbox", methods=["POST"]) def task_post_to_remote_inbox(): + """Post an activity to a remote inbox.""" task = p.parse(request) app.logger.info(f"task={task!r}") payload, to = task.payload["payload"], task.payload["to"] diff --git a/docker-compose-tests.yml b/docker-compose-tests.yml index fc94f79..3360bdc 100644 --- a/docker-compose-tests.yml +++ b/docker-compose-tests.yml @@ -33,7 +33,6 @@ services: - RABBITMQ_NODENAME=rabbit@my-rabbit poussetaches: image: "poussetaches:latest" - ports: - '7991' environment: - POUSSETACHES_AUTH_KEY=123 diff --git a/docker-compose.yml b/docker-compose.yml index bfb08a6..94c49ef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: environment: - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_MONGODB_HOST=mongo:27017 - - POUSSETACHES_AUTH_KEY=123 + - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY} celery: image: 'microblogpub:latest' links: @@ -40,7 +40,7 @@ services: - "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq" poussetaches: image: "poussetaches:latest" - ports: - - '7991' + volumes: + - "${DATA_DIR}/poussetaches:/app/poussetaches/poussetaches_data" environment: - - POUSSETACHES_AUTH_KEY=123 + - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY} diff --git a/poussetaches.py b/poussetaches.py index 6bf9180..cabf5ca 100644 --- a/poussetaches.py +++ b/poussetaches.py @@ -1,10 +1,13 @@ import base64 import json import os +from typing import Dict from typing import Any +from typing import List from dataclasses import dataclass import flask import requests +from datetime import datetime POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY") @@ -17,6 +20,18 @@ class Task: payload: Any +@dataclass +class GetTask: + payload: Any + expected: int + task_id: str + next_run: datetime + tries: int + url: str + last_error_status_code: int + last_error_body: str + + class PousseTaches: def __init__(self, api_url: str, base_url: str) -> None: self.api_url = api_url @@ -46,3 +61,49 @@ class PousseTaches: payload = json.loads(base64.b64decode(envelope["payload"])) return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore + + @staticmethod + def _expand_task(t: Dict[str, Any]) -> None: + try: + t["payload"] = json.loads(base64.b64decode(t["payload"])) + except json.JSONDecodeError: + t["payload"] = base64.b64decode(t["payload"]).decode() + + if t["last_error_body"]: + t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode() + + t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9)) + if t["last_run"]: + t["last_run"] = datetime.fromtimestamp(float(t["last__run"] / 1e9)) + else: + del t["last_run"] + + def _get(self, where: str) -> List[GetTask]: + out = [] + + resp = requests.get(self.api_url + f"/{where}") + resp.raise_for_status() + dat = resp.json() + for t in dat["tasks"]: + self._expand_task(t) + out.append(GetTask( + task_id=t["id"], + payload=t["payload"], + expected=t["expected"], + tries=t["tries"], + url=t["url"], + last_error_status_code=t["last_error_status_code"], + last_error_body=t["last_error_body"], + next_run=t["next_run"], + )) + + return out + + def get_success(self) -> List[GetTask]: + return self._get("success") + + def get_waiting(self) -> List[GetTask]: + return self._get("waiting") + + def get_dead(self) -> List[GetTask]: + return self._get("dead")