More poussetaches integrations

This commit is contained in:
Thomas Sileo 2019-04-05 21:36:56 +02:00
parent ec64d24449
commit eb92169de9
4 changed files with 110 additions and 62 deletions

102
app.py
View file

@ -62,7 +62,7 @@ from werkzeug.utils import secure_filename
import activitypub import activitypub
import config import config
import tasks import tasks # noqa: here just for the migration # FIXME(tsileo): remove me
from activitypub import Box from activitypub import Box
from activitypub import embed_collection from activitypub import embed_collection
from config import USER_AGENT from config import USER_AGENT
@ -2210,6 +2210,9 @@ def token_endpoint():
) )
#################
# Feeds
@app.route("/feed.json") @app.route("/feed.json")
def json_feed(): def json_feed():
return Response( return Response(
@ -2234,22 +2237,48 @@ def rss_feed():
) )
@app.route("/task/t1") ###########
def task_t1(): # Tasks
p.push(
"https://mastodon.cloud/@iulius/101852467780804071/activity",
"/task/cache_object",
)
return "ok"
class Tasks:
@staticmethod
def cache_object(iri: str) -> None:
p.push(iri, "/task/cache_object")
@app.route("/task/t2", methods=["POST"]) @staticmethod
def task_t2(): def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
print(request) p.push(
print(request.headers) {"iri": iri, "also_cache_attachments": also_cache_attachments},
task = p.parse(request) "/task/cache_actor",
print(task) )
return "yay"
@staticmethod
def post_to_remote_inbox(payload: str, recp: str) -> None:
p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
@staticmethod
def forward_activity(iri: str) -> None:
p.push(iri, "/task/forward_activity")
@staticmethod
def fetch_og_meta(iri: str) -> None:
p.push(iri, "/task/fetch_og_meta")
@staticmethod
def process_new_activity(iri: str) -> None:
p.push(iri, "/task/process_new_activity")
@staticmethod
def cache_attachments(iri: str) -> None:
p.push(iri, "/task/cache_attachments")
@staticmethod
def finish_post_to_inbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_inbox")
@staticmethod
def finish_post_to_outbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_outbox")
@app.route("/task/fetch_og_meta", methods=["POST"]) @app.route("/task/fetch_og_meta", methods=["POST"])
@ -2321,48 +2350,6 @@ def task_cache_object():
abort(500) abort(500)
return "" return ""
class Tasks:
@staticmethod
def cache_object(iri: str) -> None:
p.push(iri, "/task/cache_object")
@staticmethod
def cache_actor(iri: str, also_cache_attachments: bool = True) -> None:
p.push(
{"iri": iri, "also_cache_attachments": also_cache_attachments},
"/task/cache_actor",
)
@staticmethod
def post_to_remote_inbox(payload: str, recp: str) -> None:
p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox")
@staticmethod
def forward_activity(iri: str) -> None:
p.push(iri, "/task/forward_activity")
@staticmethod
def fetch_og_meta(iri: str) -> None:
p.push(iri, "/task/fetch_og_meta")
@staticmethod
def process_new_activity(iri: str) -> None:
p.push(iri, "/task/process_new_activity")
@staticmethod
def cache_attachments(iri: str) -> None:
p.push(iri, "/task/cache_attachments")
@staticmethod
def finish_post_to_inbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_inbox")
@staticmethod
def finish_post_to_outbox(iri: str) -> None:
p.push(iri, "/task/finish_post_to_outbox")
@app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901 @app.route("/task/finish_post_to_outbox", methods=["POST"]) # noqa:C901
def task_finish_post_to_outbox(): def task_finish_post_to_outbox():
task = p.parse(request) task = p.parse(request)
@ -2748,6 +2735,7 @@ def task_forward_activity():
@app.route("/task/post_to_remote_inbox", methods=["POST"]) @app.route("/task/post_to_remote_inbox", methods=["POST"])
def task_post_to_remote_inbox(): def task_post_to_remote_inbox():
"""Post an activity to a remote inbox."""
task = p.parse(request) task = p.parse(request)
app.logger.info(f"task={task!r}") app.logger.info(f"task={task!r}")
payload, to = task.payload["payload"], task.payload["to"] payload, to = task.payload["payload"], task.payload["to"]

View file

@ -33,7 +33,6 @@ services:
- RABBITMQ_NODENAME=rabbit@my-rabbit - RABBITMQ_NODENAME=rabbit@my-rabbit
poussetaches: poussetaches:
image: "poussetaches:latest" image: "poussetaches:latest"
ports:
- '7991' - '7991'
environment: environment:
- POUSSETACHES_AUTH_KEY=123 - POUSSETACHES_AUTH_KEY=123

View file

@ -14,7 +14,7 @@ services:
environment: environment:
- MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq// - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rmq//
- MICROBLOGPUB_MONGODB_HOST=mongo:27017 - MICROBLOGPUB_MONGODB_HOST=mongo:27017
- POUSSETACHES_AUTH_KEY=123 - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY}
celery: celery:
image: 'microblogpub:latest' image: 'microblogpub:latest'
links: links:
@ -40,7 +40,7 @@ services:
- "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq" - "${DATA_DIR}/rabbitmq:/var/lib/rabbitmq"
poussetaches: poussetaches:
image: "poussetaches:latest" image: "poussetaches:latest"
ports: volumes:
- '7991' - "${DATA_DIR}/poussetaches:/app/poussetaches/poussetaches_data"
environment: environment:
- POUSSETACHES_AUTH_KEY=123 - POUSSETACHES_AUTH_KEY=${POUSSETACHES_AUTH_KEY}

View file

@ -1,10 +1,13 @@
import base64 import base64
import json import json
import os import os
from typing import Dict
from typing import Any from typing import Any
from typing import List
from dataclasses import dataclass from dataclasses import dataclass
import flask import flask
import requests import requests
from datetime import datetime
POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY") POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
@ -17,6 +20,18 @@ class Task:
payload: Any payload: Any
@dataclass
class GetTask:
payload: Any
expected: int
task_id: str
next_run: datetime
tries: int
url: str
last_error_status_code: int
last_error_body: str
class PousseTaches: class PousseTaches:
def __init__(self, api_url: str, base_url: str) -> None: def __init__(self, api_url: str, base_url: str) -> None:
self.api_url = api_url self.api_url = api_url
@ -46,3 +61,49 @@ class PousseTaches:
payload = json.loads(base64.b64decode(envelope["payload"])) payload = json.loads(base64.b64decode(envelope["payload"]))
return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore return Task(req_id=envelope["req_id"], tries=envelope["tries"], payload=payload) # type: ignore
@staticmethod
def _expand_task(t: Dict[str, Any]) -> None:
try:
t["payload"] = json.loads(base64.b64decode(t["payload"]))
except json.JSONDecodeError:
t["payload"] = base64.b64decode(t["payload"]).decode()
if t["last_error_body"]:
t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
if t["last_run"]:
t["last_run"] = datetime.fromtimestamp(float(t["last__run"] / 1e9))
else:
del t["last_run"]
def _get(self, where: str) -> List[GetTask]:
out = []
resp = requests.get(self.api_url + f"/{where}")
resp.raise_for_status()
dat = resp.json()
for t in dat["tasks"]:
self._expand_task(t)
out.append(GetTask(
task_id=t["id"],
payload=t["payload"],
expected=t["expected"],
tries=t["tries"],
url=t["url"],
last_error_status_code=t["last_error_status_code"],
last_error_body=t["last_error_body"],
next_run=t["next_run"],
))
return out
def get_success(self) -> List[GetTask]:
return self._get("success")
def get_waiting(self) -> List[GetTask]:
return self._get("waiting")
def get_dead(self) -> List[GetTask]:
return self._get("dead")