diyalgo/diyalgo/expansions/timeline.py

47 lines
1.3 KiB
Python

from typing import List, Literal, Generator, Optional
import pdb
from datetime import datetime, timedelta, timezone
from mastodon import Mastodon
from diyalgo.models import Status
TIMELINES = Literal['home', 'local', 'public', 'tag', 'hashtag', 'list', 'id']
def fetch_timeline(
client:Mastodon,
timeline:TIMELINES="public",
after: Optional[datetime] = datetime.now(timezone.utc) - timedelta(days=1),
**kwargs
) -> Generator[List[Status], None, None]:
next_tl = client.timeline(timeline=timeline, **kwargs)
yield pack_statuses(next_tl)
last_tl = next_tl
while next_tl[-1]['created_at'] > after:
next_tl = client.fetch_next(last_tl)
if next_tl is None:
raise StopIteration()
yield pack_statuses(next_tl)
last_tl = next_tl
def pack_statuses(statuses:list[dict]) -> list[Status]:
out = []
for s in statuses:
if s['id'] not in [i.id for i in out]:
reblog = None
if s.get('reblog', None):
reblog = Status(**s.get('reblog'))
if reblog.id not in [i.id for i in out]:
out.append(reblog)
del s['reblog']
status = Status(**s)
status.reblog = reblog
out.append(status)
return out