2019-08-01 20:24:18 +00:00
|
|
|
from enum import Enum
|
|
|
|
from enum import unique
|
|
|
|
from typing import Any
|
2019-08-01 20:25:58 +00:00
|
|
|
from typing import Dict
|
2019-08-01 20:24:18 +00:00
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
from config import DB
|
|
|
|
|
|
|
|
_Q = Dict[str, Any]
|
|
|
|
_Doc = Optional[Dict[str, Any]]
|
|
|
|
|
|
|
|
|
|
|
|
@unique
|
|
|
|
class CollectionName(Enum):
|
|
|
|
ACTIVITIES = "activities"
|
2019-08-07 21:27:57 +00:00
|
|
|
REMOTE = "remote"
|
2019-08-01 20:24:18 +00:00
|
|
|
|
|
|
|
|
|
|
|
def find_one_activity(q: _Q) -> _Doc:
|
|
|
|
return DB[CollectionName.ACTIVITIES.value].find_one(q)
|
2019-08-04 14:34:30 +00:00
|
|
|
|
|
|
|
|
|
|
|
def update_one_activity(q: _Q, update: _Q) -> None:
|
|
|
|
DB[CollectionName.ACTIVITIES.value].update_one(q, update)
|
|
|
|
|
|
|
|
|
|
|
|
def update_many_activities(q: _Q, update: _Q) -> None:
|
|
|
|
DB[CollectionName.ACTIVITIES.value].update_many(q, update)
|
2019-08-07 21:27:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
def update_one_remote(filter_: _Q, update: _Q, upsert: bool = False) -> None:
|
|
|
|
DB[CollectionName.REMOTE.value].update_one(filter_, update, upsert)
|