diff --git a/activitypub.py b/activitypub.py index 42478d6..0d4a5c2 100644 --- a/activitypub.py +++ b/activitypub.py @@ -3,12 +3,14 @@ import json import logging import os from datetime import datetime +from datetime import timezone from enum import Enum from typing import Any from typing import Dict from typing import List from typing import Optional +from dateutil import parser from bson.objectid import ObjectId from cachetools import LRUCache from feedgen.feed import FeedGenerator @@ -28,6 +30,7 @@ from config import ID from config import ME from config import USER_AGENT from config import USERNAME +from tasks import Tasks logger = logging.getLogger(__name__) @@ -211,7 +214,7 @@ class MicroblogPubBackend(Backend): logger.info(f"dereference {iri} via HTTP") return super().fetch_iri(iri) - def fetch_iri(self, iri: str) -> ap.ObjectType: + def fetch_iri(self, iri: str, no_cache=False) -> ap.ObjectType: if iri == ME["id"]: return ME @@ -225,8 +228,11 @@ class MicroblogPubBackend(Backend): # logger.info(f"{iri} found in DB cache") # ACTORS_CACHE[iri] = data["data"] # return data["data"] + if not no_cache: + data = self._fetch_iri(iri) + else: + return super().fetch_iri(iri) - data = self._fetch_iri(iri) logger.debug(f"_fetch_iri({iri!r}) == {data!r}") if ap._has_type(data["type"], ap.ACTOR_TYPES): logger.debug(f"caching actor {iri}") @@ -468,6 +474,15 @@ class MicroblogPubBackend(Backend): @ensure_it_is_me def inbox_create(self, as_actor: ap.Person, create: ap.Create) -> None: + # If it's a `Quesiion`, trigger an async task for updating it later (by fetching the remote and updating the + # local copy) + question = create.get_object() + if question.has_type(ap.ActivityType.QUESTION): + now = datetime.now(timezone.utc) + dt = parser.parse(question.closed or question.endTime) + minutes = int((dt - now).total_seconds() / 60) + Tasks.fetch_remote_question(create.id, minutes) + self._handle_replies(as_actor, create) @ensure_it_is_me diff --git a/app.py b/app.py index 1dfb157..e27e6bd 100644 --- a/app.py +++ b/app.py @@ -88,6 +88,7 @@ from utils import opengraph from utils.key import get_secret_key from utils.lookup import lookup from utils.media import Kind +from tasks import Tasks p = PousseTaches( os.getenv("MICROBLOGPUB_POUSSETACHES_HOST", "http://localhost:7991"), @@ -1167,7 +1168,7 @@ def remove_context(activity: Dict[str, Any]) -> Dict[str, Any]: return activity -def _add_answers_to_questions(raw_doc: Dict[str, Any]) -> None: +def _add_answers_to_question(raw_doc: Dict[str, Any]) -> None: activity = raw_doc["activity"] if ( ap._has_type(activity["type"], ActivityType.CREATE) @@ -1182,7 +1183,7 @@ def _add_answers_to_questions(raw_doc: Dict[str, Any]) -> None: .get(_answer_key(choice["name"]), 0), } now = datetime.now().astimezone() - if format_datetime(now) > activity["object"]["endTime"]: + if format_datetime(now) >= activity["object"]["endTime"]: activity["object"]["closed"] = activity["object"]["endTime"] @@ -1192,7 +1193,7 @@ def activity_from_doc(raw_doc: Dict[str, Any], embed: bool = False) -> Dict[str, # Handle Questions # TODO(tsileo): what about object embedded by ID/URL? - _add_answers_to_questions(raw_doc) + _add_answers_to_question(raw_doc) if embed: return remove_context(activity) return activity @@ -1569,6 +1570,19 @@ def admin_notifications(): ], } inbox_data, older_than, newer_than = paginated_query(DB.activities, q) + if not newer_than: + nstart = datetime.now(timezone.utc).isoformat() + else: + nstart = inbox_data[0]["_id"].generation_time.isoformat() + if not older_than: + nend = (datetime.now(timezone.utc) - timedelta(days=15)).isoformat() + else: + nend = inbox_data[-1]["_id"].generation_time.isoformat() + print(nstart, nend) + notifs = list(DB.notifications.find({"datetime": {"$lte": nstart, "$gt": nend}}).sort("_id", -1).limit(50)) + inbox_data.extend(notifs) + inbox_data = sorted(inbox_data, reverse=True, key=lambda doc: doc["_id"].generation_time) + print(inbox_data) return render_template( "stream.html", @@ -1664,6 +1678,7 @@ def api_vote(): tag=[], inReplyTo=note.id, ) + raw_note["@context"] = config.DEFAULT_CTX note = ap.Note(**raw_note) create = note.build_create() @@ -1947,10 +1962,11 @@ def api_new_question(): break answers.append({"type": ActivityType.NOTE.value, "name": a}) + open_for = int(_user_api_arg("open_for")) choices = { "endTime": ap.format_datetime( datetime.now().astimezone() - + timedelta(minutes=int(_user_api_arg("open_for"))) + + timedelta(minutes=open_for) ) } of = _user_api_arg("of") @@ -1974,6 +1990,8 @@ def api_new_question(): create = question.build_create() create_id = post_to_outbox(create) + Tasks.update_question_outbox(create_id, open_for) + return _user_api_response(activity=create_id) @@ -2427,51 +2445,6 @@ def rss_feed(): ) -########### -# Tasks - - -class Tasks: - @staticmethod - def cache_object(iri: str) -> None: - p.push(iri, "/task/cache_object") - - @staticmethod - def cache_actor(iri: str, also_cache_attachments: bool = True) -> None: - p.push( - {"iri": iri, "also_cache_attachments": also_cache_attachments}, - "/task/cache_actor", - ) - - @staticmethod - def post_to_remote_inbox(payload: str, recp: str) -> None: - p.push({"payload": payload, "to": recp}, "/task/post_to_remote_inbox") - - @staticmethod - def forward_activity(iri: str) -> None: - p.push(iri, "/task/forward_activity") - - @staticmethod - def fetch_og_meta(iri: str) -> None: - p.push(iri, "/task/fetch_og_meta") - - @staticmethod - def process_new_activity(iri: str) -> None: - p.push(iri, "/task/process_new_activity") - - @staticmethod - def cache_attachments(iri: str) -> None: - p.push(iri, "/task/cache_attachments") - - @staticmethod - def finish_post_to_inbox(iri: str) -> None: - p.push(iri, "/task/finish_post_to_inbox") - - @staticmethod - def finish_post_to_outbox(iri: str) -> None: - p.push(iri, "/task/finish_post_to_outbox") - - @app.route("/task/fetch_og_meta", methods=["POST"]) def task_fetch_og_meta(): task = p.parse(request) @@ -2986,17 +2959,70 @@ def task_post_to_remote_inbox(): return "" +@app.route("/task/fetch_remote_question", methods=["POST"]) +def task_fetch_remote_question(): + """Fetch a remote question for implementation that does not send Update.""" + task = p.parse(request) + app.logger.info(f"task={task!r}") + iri = task.payload + try: + app.logger.info(f"Fetching remote question {iri}") + local_question = DB.activities.find_one( + {"box": Box.INBOX.value, "remote_id": iri} + ) + remote_question = get_backend().fetch_iri(iri, no_cache=True) + if local_question["meta"].get("voted_for") or local_question["meta"]["subscribed"]: + DB.notifications.insert_one({"type": "question_ended", "datetime": datetime.now(timezone.utc).isoformat(), + "activity": remote_question}) + + DB.activities.update_one( + {"remote_id": iri, "box": Box.INBOX.value}, + {"$set": {"activity": remote_question}}, + ) + + except HTTPError as err: + app.logger.exception("request failed") + if 400 >= err.response.status_code >= 499: + app.logger.info("client error, no retry") + return "" + + raise TaskError() from err + except Exception as err: + app.logger.exception("task failed") + raise TaskError() from err + + return "" + + @app.route("/task/update_question", methods=["POST"]) def task_update_question(): - """Post an activity to a remote inbox.""" + """Sends an Update.""" task = p.parse(request) app.logger.info(f"task={task!r}") iri = task.payload try: app.logger.info(f"Updating question {iri}") - # TODO(tsileo): sends an Update with the question/iri as an actor, with the updated stats (LD sig will fail?) - # but to who? followers and people who voted? but this must not be visible right? - # also sends/trigger a notification when a poll I voted for ends like Mastodon? + cc = [ID + "/followers"] + doc = DB.activities.find_one( + {"box": Box.OUTBOX.value, "remote_id": iri} + ) + _add_answers_to_question(doc) + question = ap.Question(**doc["activity"]["object"]) + + raw_update = dict( + actor=question.id, + object=question.to_dict(embed=True), + attributedTo=MY_PERSON.id, + cc=list(set(cc)), + to=[ap.AS_PUBLIC], + ) + raw_update["@context"] = config.DEFAULT_CTX + + update = ap.Update(**raw_update) + print(update) + print(update.to_dict()) + post_to_outbox(update) + except HTTPError as err: app.logger.exception("request failed") if 400 >= err.response.status_code >= 499: diff --git a/templates/new.html b/templates/new.html index a03cc62..9b27df6 100644 --- a/templates/new.html +++ b/templates/new.html @@ -35,6 +35,7 @@ {% if request.args.get("question") == "1" %}
Open for:
- {% else %} -{% if real_end_time | gtnow %}This question ended {{ real_end_time | format_timeago }}.
- {% else %}This question ends {{ real_end_time | format_timeago }}{% endif %} - -