diff --git a/app.py b/app.py index 1da742d..30c8241 100644 --- a/app.py +++ b/app.py @@ -816,16 +816,41 @@ def paginated_query(db, q, limit=25, sort_key="_id"): return outbox_data, older_than, newer_than +CACHING = True + + +def _get_cached(type_="html", arg=None): + if not CACHING: + return None + logged_in = session.get("logged_in") + if not logged_in: + cached = DB.cache2.find_one({"path": request.path, "type": type_, "arg": arg}) + if cached: + app.logger.info("from cache") + return cached['response_data'] + return None + +def _cache(resp, type_="html", arg=None): + if not CACHING: + return None + logged_in = session.get("logged_in") + if not logged_in: + DB.cache2.update_one( + {"path": request.path, "type": type_, "arg": arg}, + {"$set": {"response_data": resp, "date": datetime.now(timezone.utc)}}, + upsert=True, + ) + return None + + @app.route("/") def index(): if is_api_request(): return jsonify(**ME) - logged_in = session.get("logged_in", False) - if not logged_in: - cached = DB.cache.find_one({"path": request.path, "type": "html"}) - if cached: - app.logger.info("from cache") - return cached['response_data'] + cache_arg = f"{request.args.get('older_than', '')}:{request.args.get('newer_than', '')}" + cached = _get_cached("html", cache_arg) + if cached: + return cached q = { "box": Box.OUTBOX.value, @@ -859,12 +884,7 @@ def index(): newer_than=newer_than, pinned=pinned, ) - if not logged_in: - DB.cache.update_one( - {"path": request.path, "type": "html"}, - {"$set": {"response_data": resp, "date": datetime.now(timezone.utc)}}, - upsert=True, - ) + _cache(resp, "html", cache_arg) return resp @@ -1011,32 +1031,41 @@ def note_by_id(note_id): @app.route("/nodeinfo") def nodeinfo(): - q = { - "box": Box.OUTBOX.value, - "meta.deleted": False, # TODO(tsileo): retrieve deleted and expose tombstone - "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]}, - } + response = _get_cached("api") + cached = True + if not response: + cached = False + q = { + "box": Box.OUTBOX.value, + "meta.deleted": False, # TODO(tsileo): retrieve deleted and expose tombstone + "type": {"$in": [ActivityType.CREATE.value, ActivityType.ANNOUNCE.value]}, + } + + response = json.dumps( + { + "version": "2.0", + "software": { + "name": "microblogpub", + "version": f"Microblog.pub {VERSION}", + }, + "protocols": ["activitypub"], + "services": {"inbound": [], "outbound": []}, + "openRegistrations": False, + "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, + "metadata": { + "sourceCode": "https://github.com/tsileo/microblog.pub", + "nodeName": f"@{USERNAME}@{DOMAIN}", + }, + } + ) + + if not cached: + _cache(response, "api") return Response( headers={ "Content-Type": "application/json; profile=http://nodeinfo.diaspora.software/ns/schema/2.0#" }, - response=json.dumps( - { - "version": "2.0", - "software": { - "name": "microblogpub", - "version": f"Microblog.pub {VERSION}", - }, - "protocols": ["activitypub"], - "services": {"inbound": [], "outbound": []}, - "openRegistrations": False, - "usage": {"users": {"total": 1}, "localPosts": DB.activities.count(q)}, - "metadata": { - "sourceCode": "https://github.com/tsileo/microblog.pub", - "nodeName": f"@{USERNAME}@{DOMAIN}", - }, - } - ), + response=response, ) diff --git a/config.py b/config.py index dfb553c..b66ce71 100644 --- a/config.py +++ b/config.py @@ -109,8 +109,8 @@ def create_indexes(): ("activity.object.id", pymongo.ASCENDING), ("meta.deleted", pymongo.ASCENDING), ]) - DB.cache.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING)]) - DB.cache.create_index("date", expireAfterSeconds=60) + DB.cache2.create_index([("path", pymongo.ASCENDING), ("type", pymongo.ASCENDING), ("arg", pymongo.ASCENDING)]) + DB.cache2.create_index("date", expireAfterSeconds=3600*12) # Index for the block query DB.activities.create_index( diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index b487dc9..8a3a677 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -1,5 +1,16 @@ version: '3' services: + flower: + image: microblogpub:latest + links: + - mongo + - rabbitmq + command: 'celery flower -l info -A tasks --broker amqp://guest@rabbitmq// --address=0.0.0.0 --port=5556' + environment: + - MICROBLOGPUB_AMQP_BROKER=pyamqp://guest@rabbitmq// + - MICROBLOGPUB_MONGODB_HOST=mongo:27017 + ports: + - "5556:5556" celery: image: microblogpub:latest links: diff --git a/requirements.txt b/requirements.txt index d2eec2d..d9fccea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ python-dateutil libsass +flower gunicorn piexif requests diff --git a/tasks.py b/tasks.py index c05d200..96b185b 100644 --- a/tasks.py +++ b/tasks.py @@ -6,6 +6,7 @@ import random import requests from celery import Celery from little_boxes import activitypub as ap +from little_boxes.errors import BadActivityError from little_boxes.errors import ActivityGoneError from little_boxes.errors import ActivityNotFoundError from little_boxes.errors import NotAnActivityError @@ -59,7 +60,8 @@ def process_new_activity(self, iri: str) -> None: try: activity.get_object() tag_stream = True - except NotAnActivityError: + except (NotAnActivityError, BadActivityError): + log.exception(f"failed to get announce object for {activity!r}") # Most likely on OStatus notice tag_stream = False should_delete = True @@ -317,6 +319,26 @@ def post_to_inbox(activity: ap.BaseActivity) -> None: finish_post_to_inbox.delay(activity.id) +def invalidate_cache(activity): + if activity.has_type(ap.ActivityType.LIKE): + if activity.get_object().id.startswith(BASE_URL): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.ANNOUNCE): + if activity.get_object.id.startswith(BASE_URL): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.UNDO): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.DELETE): + # TODO(tsileo): only invalidate if it's a delete of a reply + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.UPDATE): + DB.cache2.remove() + elif activity.has_type(ap.ActivityType.CREATE): + note = activity.get_object() + if not note.inReplyTo or note.inReplyTo.startswith(ID): + DB.cache2.remove() + # FIXME(tsileo): check if it's a reply of a reply + @app.task(bind=True, max_retries=MAX_RETRIES) # noqa: C901 def finish_post_to_inbox(self, iri: str) -> None: try: @@ -345,6 +367,10 @@ def finish_post_to_inbox(self, iri: str) -> None: back.inbox_undo_announce(MY_PERSON, obj) elif obj.has_type(ap.ActivityType.FOLLOW): back.undo_new_follower(MY_PERSON, obj) + try: + invalidate_cache(activity) + except Exception: + log.exception("failed to invalidate cache") except (ActivityGoneError, ActivityNotFoundError, NotAnActivityError): log.exception(f"no retry") except Exception as err: @@ -396,6 +422,8 @@ def finish_post_to_outbox(self, iri: str) -> None: log.info(f"recipients={recipients}") activity = ap.clean_activity(activity.to_dict()) + DB.cache2.remove() + payload = json.dumps(activity) for recp in recipients: log.debug(f"posting to {recp}")