masto-bridges/masto_bridges/bot.py

165 lines
5.3 KiB
Python

import pdb
from typing import Optional
from datetime import datetime
from masto_bridges.config import Config
from masto_bridges.models import Account, List
from masto_bridges.post import Post, Status
from masto_bridges.logger import init_logger
from masto_bridges.repo import Repo
from masto_bridges.caldav import CalDAV
from mastodon import Mastodon, StreamListener
class Listener(StreamListener):
def __init__(self, client: Mastodon, config: Optional[Config] = None):
super(Listener, self).__init__()
self.client = client
if config is None:
config = Config()
self.config = config
self.logger = init_logger('mastogit_bot-stream', basedir=self.config.LOGDIR)
self.repo = None
if self.config.ENABLE_GIT:
self.repo = Repo(path=config.GIT_REPO)
self.caldav = None
if self.config.ENABLE_CALDAV is not None:
self.caldav = CalDAV(config=self.config)
def on_update(self, status:dict):
try:
status = Status(**status)
except:
self.logger.warning(f'Unhandled event: {status}')
return
if status.visibility in ('private', 'direct'):
# not xposting dms
self.logger.info('Not xposting private messages')
return
post = Post.from_status(status)
if post.text.startswith('xpost'):
self.logger.info('Not xposting an xpost')
return
success = False
if self.repo:
git_success = self.repo.post(post.format_commit())
else:
git_success = True
if self.caldav:
caldav_success = self.caldav.post(post.format_caldav())
else:
caldav_success = False
success = git_success and caldav_success
if success:
self.logger.info('Posted to bridges!')
else:
self.logger.exception('Failed to post to bridges!')
def on_unknown_event(self, name, unknown_event = None):
"""An unknown mastodon API event has been received. The name contains the event-name and unknown_event
contains the content of the unknown event.
This function must be implemented, if unknown events should be handled without an error.
"""
self.logger.exception(f'Unknown event type: {name}')
class Bot:
def __init__(self, config:Optional[Config]=None, post_length=500):
self._me = None # type: Optional[Account]
self._me_list = None # type: Optional[List]
if config is None:
config = Config()
self.config = config
self.config.LOGDIR.mkdir(exist_ok=True)
self.post_length = post_length
self.logger = init_logger('mastogit_bot', basedir=self.config.LOGDIR)
self.client = Mastodon(
access_token=self.config.MASTO_TOKEN,
api_base_url=self.config.MASTO_URL
)
self.caldav = None
if self.config.ENABLE_CALDAV:
self.caldav = CalDAV(self.config)
def init_stream(self, run_async:bool=True):
# Listen to a stream consisting of just us.
listener = Listener(client=self.client, config=self.config)
self.logger.info('Initializing streaming')
if self.config.STREAM_MODE == 'list':
self.client.stream_list(
self.me_list.id,
listener = listener,
run_async=run_async
)
elif self.config.STREAM_MODE == 'home':
self.client.stream_user(
listener=listener,
run_async=run_async
)
def post(self, post:str):
# TODO: Split long posts
if len(post)>self.post_length:
raise NotImplementedError(f"Cant split long posts yet, got post of length {len(post)} when max length is {self.post_length}")
self.client.status_post(post)
self.logger.info(f"Posted:\n{post}")
def poll_caldav(self):
events = self.caldav.poll()
if len(events) > 0:
for event in events:
post = Post.from_vevent(event.vobject_instance.vevent)
if post.cal_user.split('@')[0] != self.me.acct and post.cal_user != self.me.acct:
self.logger.debug(f"Not posting calendar event not from us. Got cal user: {post.cal_user} and we are {self.me.acct}")
continue
if post.cal_url:
self.logger.debug("Not posting a post we made on masto back to masto")
continue
self.post(post.format_masto())
@property
def me(self) -> Account:
if self._me is None:
self._me = Account(**self.client.me())
return self._me
def _make_me_list(self) -> List:
me_list = List(**self.client.list_create('me'))
self.client.list_accounts_add(me_list.id, [self.me.id])
self.logger.info('Created list with just me in it!')
return me_list
@property
def me_list(self) -> List:
if self._me_list is None:
lists = self.client.lists()
me_list = [l for l in lists if l.get('title', '') == 'me']
if len(me_list)>0:
self._me_list = List(**me_list[0])
else:
self._me_list = self._make_me_list()
return self._me_list