164 lines
5.3 KiB
Python
164 lines
5.3 KiB
Python
import pdb
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
from masto_bridges.config import Config
|
|
from masto_bridges.models import Account, List
|
|
from masto_bridges.post import Post, Status
|
|
from masto_bridges.logger import init_logger
|
|
from masto_bridges.repo import Repo
|
|
from masto_bridges.caldav import CalDAV
|
|
|
|
from mastodon import Mastodon, StreamListener
|
|
|
|
class Listener(StreamListener):
|
|
def __init__(self, client: Mastodon, config: Optional[Config] = None):
|
|
super(Listener, self).__init__()
|
|
self.client = client
|
|
|
|
if config is None:
|
|
config = Config()
|
|
self.config = config
|
|
|
|
self.logger = init_logger('mastogit_bot-stream', basedir=self.config.LOGDIR)
|
|
|
|
self.repo = None
|
|
if self.config.ENABLE_GIT:
|
|
self.repo = Repo(path=config.GIT_REPO)
|
|
|
|
self.caldav = None
|
|
if self.config.ENABLE_CALDAV is not None:
|
|
self.caldav = CalDAV(config=self.config)
|
|
|
|
|
|
|
|
def on_update(self, status:dict):
|
|
try:
|
|
status = Status(**status)
|
|
except:
|
|
self.logger.warning(f'Unhandled event: {status}')
|
|
return
|
|
|
|
if status.visibility in ('private', 'direct'):
|
|
# not xposting dms
|
|
self.logger.info('Not xposting private messages')
|
|
return
|
|
|
|
post = Post.from_status(status)
|
|
if post.text.startswith('xpost'):
|
|
self.logger.info('Not xposting an xpost')
|
|
return
|
|
|
|
|
|
|
|
success = False
|
|
if self.repo:
|
|
git_success = self.repo.post(post.format_commit())
|
|
else:
|
|
git_success = True
|
|
|
|
if self.caldav:
|
|
caldav_success = self.caldav.post(post.format_caldav())
|
|
else:
|
|
caldav_success = False
|
|
|
|
success = git_success and caldav_success
|
|
|
|
if success:
|
|
self.logger.info('Posted to bridges!')
|
|
else:
|
|
self.logger.exception('Failed to post to bridges!')
|
|
|
|
def on_unknown_event(self, name, unknown_event = None):
|
|
"""An unknown mastodon API event has been received. The name contains the event-name and unknown_event
|
|
contains the content of the unknown event.
|
|
|
|
This function must be implemented, if unknown events should be handled without an error.
|
|
"""
|
|
self.logger.exception(f'Unknown event type: {name}')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Bot:
|
|
def __init__(self, config:Optional[Config]=None, post_length=500):
|
|
self._me = None # type: Optional[Account]
|
|
self._me_list = None # type: Optional[List]
|
|
|
|
if config is None:
|
|
config = Config()
|
|
|
|
self.config = config
|
|
self.config.LOGDIR.mkdir(exist_ok=True)
|
|
self.post_length = post_length
|
|
self.logger = init_logger('mastogit_bot', basedir=self.config.LOGDIR)
|
|
|
|
self.client = Mastodon(
|
|
access_token=self.config.MASTO_TOKEN,
|
|
api_base_url=self.config.MASTO_URL
|
|
)
|
|
|
|
self.caldav = None
|
|
if self.config.ENABLE_CALDAV:
|
|
self.caldav = CalDAV(self.config)
|
|
|
|
def init_stream(self, run_async:bool=True):
|
|
# Listen to a stream consisting of just us.
|
|
listener = Listener(client=self.client, config=self.config)
|
|
self.logger.info('Initializing streaming')
|
|
if self.config.STREAM_MODE == 'list':
|
|
self.client.stream_list(
|
|
self.me_list.id,
|
|
listener = listener,
|
|
run_async=run_async
|
|
)
|
|
elif self.config.STREAM_MODE == 'home':
|
|
self.client.stream_user(
|
|
listener=listener,
|
|
run_async=run_async
|
|
)
|
|
|
|
def post(self, post:str):
|
|
# TODO: Split long posts
|
|
if len(post)>self.post_length:
|
|
raise NotImplementedError(f"Cant split long posts yet, got post of length {len(post)} when max length is {self.post_length}")
|
|
|
|
self.client.status_post(post)
|
|
self.logger.info(f"Posted:\n{post}")
|
|
|
|
def poll_caldav(self):
|
|
events = self.caldav.poll()
|
|
if len(events) > 0:
|
|
for event in events:
|
|
post = Post.from_vevent(event.vobject_instance.vevent)
|
|
if post.cal_user.split('@')[0] != self.me.acct and post.cal_user != self.me.acct:
|
|
self.logger.debug(f"Not posting calendar event not from us. Got cal user: {post.cal_user} and we are {self.me.acct}")
|
|
continue
|
|
if post.cal_url:
|
|
self.logger.debug("Not posting a post we made on masto back to masto")
|
|
continue
|
|
self.post(post.format_masto())
|
|
|
|
@property
|
|
def me(self) -> Account:
|
|
if self._me is None:
|
|
self._me = Account(**self.client.me())
|
|
return self._me
|
|
|
|
def _make_me_list(self) -> List:
|
|
me_list = List(**self.client.list_create('me'))
|
|
self.client.list_accounts_add(me_list.id, [self.me.id])
|
|
self.logger.info('Created list with just me in it!')
|
|
return me_list
|
|
|
|
@property
|
|
def me_list(self) -> List:
|
|
if self._me_list is None:
|
|
lists = self.client.lists()
|
|
me_list = [l for l in lists if l.get('title', '') == 'me']
|
|
if len(me_list)>0:
|
|
self._me_list = List(**me_list[0])
|
|
else:
|
|
self._me_list = self._make_me_list()
|
|
return self._me_list
|