microblog.pub/core/meta.py

113 lines
2.4 KiB
Python
Raw Normal View History

2019-08-15 15:30:40 +00:00
from datetime import datetime
from enum import Enum
2019-07-29 20:53:41 +00:00
from enum import unique
from typing import Any
from typing import Dict
from typing import List
from typing import Union
from little_boxes import activitypub as ap
_SubQuery = Dict[str, Any]
2019-07-29 20:46:53 +00:00
@unique
class Box(Enum):
INBOX = "inbox"
OUTBOX = "outbox"
REPLIES = "replies"
2019-07-29 20:46:53 +00:00
@unique
class MetaKey(Enum):
NOTIFICATION = "notification"
NOTIFICATION_UNREAD = "notification_unread"
NOTIFICATION_FOLLOWS_BACK = "notification_follows_back"
2019-07-30 20:12:20 +00:00
POLL_ANSWER = "poll_answer"
STREAM = "stream"
ACTOR_ID = "actor_id"
2019-08-10 09:47:50 +00:00
ACTOR = "actor"
ACTOR_HASH = "actor_hash"
UNDO = "undo"
PUBLISHED = "published"
GC_KEEP = "gc_keep"
OBJECT = "object"
OBJECT_ID = "object_id"
OBJECT_ACTOR = "object_actor"
2019-08-10 09:47:50 +00:00
OBJECT_ACTOR_ID = "object_actor_id"
OBJECT_ACTOR_HASH = "object_actor_hash"
2019-08-01 20:24:18 +00:00
PUBLIC = "public"
2019-08-16 19:16:25 +00:00
THREAD_ROOT_PARENT = "thread_root_parent"
DELETED = "deleted"
BOOSTED = "boosted"
LIKED = "liked"
COUNT_LIKE = "count_like"
COUNT_BOOST = "count_boost"
2019-08-16 12:42:15 +00:00
COUNT_REPLY = "count_reply"
def _meta(mk: MetaKey) -> str:
return f"meta.{mk.value}"
2019-08-15 15:30:40 +00:00
def flag(mk: MetaKey, val: Any) -> _SubQuery:
return {_meta(mk): val}
def by_remote_id(remote_id: str) -> _SubQuery:
return {"remote_id": remote_id}
def in_inbox() -> _SubQuery:
return {"box": Box.INBOX.value}
def in_outbox() -> _SubQuery:
return {"box": Box.OUTBOX.value}
def by_type(type_: Union[ap.ActivityType, List[ap.ActivityType]]) -> _SubQuery:
if isinstance(type_, list):
return {"type": {"$in": [t.value for t in type_]}}
return {"type": type_.value}
def not_undo() -> _SubQuery:
2019-08-15 15:30:40 +00:00
return flag(MetaKey.UNDO, False)
2019-08-16 20:27:59 +00:00
def not_deleted() -> _SubQuery:
return flag(MetaKey.DELETED, False)
def by_actor(actor: ap.BaseActivity) -> _SubQuery:
2019-08-15 15:30:40 +00:00
return flag(MetaKey.ACTOR_ID, actor.id)
2019-08-01 20:24:18 +00:00
def by_object_id(object_id: str) -> _SubQuery:
2019-08-15 15:30:40 +00:00
return flag(MetaKey.OBJECT_ID, object_id)
2019-08-01 20:24:18 +00:00
def is_public() -> _SubQuery:
2019-08-15 15:30:40 +00:00
return flag(MetaKey.PUBLIC, True)
def inc(mk: MetaKey, val: int) -> _SubQuery:
2019-08-15 15:30:40 +00:00
return {"$inc": flag(mk, val)}
def upsert(data: Dict[MetaKey, Any]) -> _SubQuery:
sq: Dict[str, Any] = {}
for mk, val in data.items():
sq[_meta(mk)] = val
return {"$set": sq}
2019-08-10 09:47:50 +00:00
2019-08-15 15:30:40 +00:00
def published_after(dt: datetime) -> _SubQuery:
return flag(MetaKey.PUBLISHED, {"$gt": ap.format_datetime(dt)})