mastodon-ld/masto_ld/bots/mastodon.py

185 lines
5.5 KiB
Python

import pdb
from typing import Optional
from urllib.parse import urlsplit
from masto_ld.config import Config
from masto_ld.models.models import Account, List
from masto_ld.models.post import Status
from masto_ld.logger import init_logger
from masto_ld.patterns.tag import Namespaced_Tag
from masto_ld.interfaces.smw import SMW
from masto_ld.interfaces.mediawiki import Wiki
import pypandoc
from mastodon import Mastodon, StreamListener
class UserListener(StreamListener):
"""
Listener for all events that are in our feed
"""
def __init__(self, client:Mastodon, config:Config):
self.client = client
self.config = config
self.logger = init_logger(name="user-listener", basedir=config.LOGDIR)
def on_update(self, status):
status = Status(**status)
tag = Namespaced_Tag.from_html(status.content)
if tag is not None:
self.handle_tagged_post(status, tag)
pdb.set_trace()
def on_notification(self, notification):
pdb.set_trace()
if notification.get('type', '') == 'follow':
self.logger.debug(f'following back {notification["account"]["url"]}')
self.follow_back(notification)
def follow_back(self, notification):
self.client.account_follow(notification['account']['id'])
self.logger.debug(f'account followed! {notification["account"]["url"]} with id {notification["account"]["id"]}')
def handle_tagged_post(self, status:Status, tag:Namespaced_Tag):
# FIXME: This is the worst way of doing this
# resolve the name of the tag
# find the right field
field = [f for f in status.account.fields if f.name == tag.tags[0]]
if len(field) == 0:
raise ValueError('No namespace found!')
else:
field = field[0]
# get the tags from that field
smw = SMW(self.config.WIKI_URL)
pagename = urlsplit(field.url).path.lstrip('/')
wiki_tags = smw.tags(pagename)
# find the long name of the tag
longname = [t for t in wiki_tags if t[0] == tag.tags[1]]
if len(longname) == 0:
raise ValueError('No matching tag found!')
else:
longname = longname[0]
# format our freaking page IN PLACE LIKE AN ANIMAL
page_content = (
"{{Thread\n",
f"|About={longname}\n",
f"|URL={status.url}\n",
"}}\n\n"
)
page_content += pypandoc.convert_text(status.content, 'mediawiki-raw_html', format='html')
page_name = status.soup.find('h1')
if page_name is None:
raise ValueError('No Header found!')
page_
wiki = Wiki(self.config, log_dir=self.config.LOGDIR)
class ListListener(StreamListener):
"""
Listener for events from users on our follows list
"""
def __init__(self, client: Mastodon, config:Optional[Config]=None):
super(Listener, self).__init__()
self.client = client
if config is None:
config = Config()
self.config = config
self.logger = init_logger('mastogit_bot-stream', basedir=self.config.LOGDIR)
self.repo = Repo(path=config.GIT_REPO)
def on_update(self, status:dict):
status = Status(**status)
if status.visibility in ('private', 'direct'):
# not xposting dms
self.logger.info('Not xposting private messages')
return
post = Post.from_status(status)
if post.text.startswith('xpost'):
self.logger.info('Not xposting an xpost')
return
success = self.repo.post(post.format_commit())
if success:
self.logger.info('Posted to git!')
else:
self.logger.exception('Failed to post to git!')
class Bot:
def __init__(self, config:Optional[Config]=None, post_length=500):
self._me = None # type: Optional[Account]
# self._me_list = None # type: Optional[List]
if config is None:
config = Config()
self.config = config
self.config.LOGDIR.mkdir(exist_ok=True)
# self.post_length = post_length
self.logger = init_logger('mastogit_bot', basedir=self.config.LOGDIR)
self.client = Mastodon(
access_token=self.config.MASTO_TOKEN,
api_base_url=self.config.MASTO_URL
)
def init_stream(self, run_async:bool=True):
# Listen to a stream consisting of just us.
listener = UserListener(self.client, self.config)
self.logger.info('Initializing streaming')
self.client.stream_user(
listener = listener,
run_async=run_async
)
def post(self, post:str):
self.client.status_post(post)
self.logger.info(f"Posted:\n{post}")
@property
def me(self) -> Account:
if self._me is None:
self._me = Account(**self.client.me())
return self._me
def _make_me_list(self) -> List:
me_list = List(**self.client.list_create('me'))
self.client.list_accounts_add(me_list.id, [self.me.id])
self.logger.info('Created list with just me in it!')
return me_list
# @property
# def me_list(self) -> List:
# if self._me_list is None:
# lists = self.client.lists()
# me_list = [l for l in lists if l.get('title', '') == 'me']
# if len(me_list)>0:
# self._me_list = List(**me_list[0])
# else:
# self._me_list = self._make_me_list()
# return self._me_list