import pdb from typing import Optional from datetime import datetime from masto_bridges.config import Config from masto_bridges.models import Account, List from masto_bridges.post import Post, Status from masto_bridges.logger import init_logger from masto_bridges.repo import Repo from masto_bridges.caldav import CalDAV from mastodon import Mastodon, StreamListener class Listener(StreamListener): def __init__(self, client: Mastodon, config: Optional[Config] = None): super(Listener, self).__init__() self.client = client if config is None: config = Config() self.config = config self.logger = init_logger('mastogit_bot-stream', basedir=self.config.LOGDIR) self.repo = None if self.config.ENABLE_GIT: self.repo = Repo(path=config.GIT_REPO) self.caldav = None if self.config.ENABLE_CALDAV is not None: self.caldav = CalDAV(config=self.config) def on_update(self, status:dict): try: status = Status(**status) except: self.logger.warning(f'Unhandled event: {status}') return if status.visibility in ('private', 'direct'): # not xposting dms self.logger.info('Not xposting private messages') return post = Post.from_status(status) if post.text.startswith('xpost'): self.logger.info('Not xposting an xpost') return success = False if self.repo: git_success = self.repo.post(post.format_commit()) else: git_success = True if self.caldav: caldav_success = self.caldav.post(post.format_caldav()) else: caldav_success = False success = git_success and caldav_success if success: self.logger.info('Posted to bridges!') else: self.logger.exception('Failed to post to bridges!') def on_unknown_event(self, name, unknown_event = None): """An unknown mastodon API event has been received. The name contains the event-name and unknown_event contains the content of the unknown event. This function must be implemented, if unknown events should be handled without an error. """ self.logger.exception(f'Unknown event type: {name}') class Bot: def __init__(self, config:Optional[Config]=None, post_length=500): self._me = None # type: Optional[Account] self._me_list = None # type: Optional[List] if config is None: config = Config() self.config = config self.config.LOGDIR.mkdir(exist_ok=True) self.post_length = post_length self.logger = init_logger('mastogit_bot', basedir=self.config.LOGDIR) self.client = Mastodon( access_token=self.config.MASTO_TOKEN, api_base_url=self.config.MASTO_URL ) self.caldav = None if self.config.ENABLE_CALDAV: self.caldav = CalDAV(self.config) def init_stream(self, run_async:bool=True): # Listen to a stream consisting of just us. listener = Listener(client=self.client, config=self.config) self.logger.info('Initializing streaming') if self.config.STREAM_MODE == 'list': self.client.stream_list( self.me_list.id, listener = listener, run_async=run_async ) elif self.config.STREAM_MODE == 'home': self.client.stream_user( listener=listener, run_async=run_async ) def post(self, post:str): # TODO: Split long posts if len(post)>self.post_length: raise NotImplementedError(f"Cant split long posts yet, got post of length {len(post)} when max length is {self.post_length}") self.client.status_post(post) self.logger.info(f"Posted:\n{post}") def poll_caldav(self): events = self.caldav.poll() if len(events) > 0: for event in events: post = Post.from_vevent(event.vobject_instance.vevent) if post.cal_user.split('@')[0] != self.me.acct and post.cal_user != self.me.acct: self.logger.debug(f"Not posting calendar event not from us. Got cal user: {post.cal_user} and we are {self.me.acct}") continue if post.cal_url: self.logger.debug("Not posting a post we made on masto back to masto") continue self.post(post.format_masto()) @property def me(self) -> Account: if self._me is None: self._me = Account(**self.client.me()) return self._me def _make_me_list(self) -> List: me_list = List(**self.client.list_create('me')) self.client.list_accounts_add(me_list.id, [self.me.id]) self.logger.info('Created list with just me in it!') return me_list @property def me_list(self) -> List: if self._me_list is None: lists = self.client.lists() me_list = [l for l in lists if l.get('title', '') == 'me'] if len(me_list)>0: self._me_list = List(**me_list[0]) else: self._me_list = self._make_me_list() return self._me_list