import pdb import sys import typing from typing import Optional, Union, Tuple, List, Dict from wiki_postbot.actions.action import Action, Result from wiki_postbot.logger import init_logger from dataclasses import dataclass import re from tweepy import Response, StreamResponse import parse @dataclass class Command_Params: username: str command: str args: Optional[dict] class Command(Action): """ An action invoked by tweeting at the bot and telling it to do something. Actions are usually not embedded within othter text, but exist as their own messages, as opposed to :class:`.Inline` actions """ name:str command_str = r"@(?P\S{1,})\s*(?P\w*)" arg_str = r"^(\w*?):\s*(.*)" @dataclass class Arguments: pass def __init__(self, *args, **kwargs): super(Command, self).__init__(*args, **kwargs) self.logger = init_logger('wiki_postbot.action.command.'+self.name, self.bot.basedir) self.command_pattern = re.compile(self.command_str) self.arg_pattern = re.compile(self.arg_str, re.MULTILINE) @classmethod def parse(cls, tweet:Union[str, Response]) -> Union[None, Command_Params]: if isinstance(tweet, (Response, StreamResponse)): tweet = tweet.data.text command_pattern = re.compile(cls.command_str) arg_pattern = re.compile(cls.arg_str, re.MULTILINE) command_tup = command_pattern.findall(tweet) command_tup = [c for c in command_tup if len(c) == 2 and len(c[0])>0 and len(c[1])>0] if len(command_tup) == 0: return None else: param_dict = {'username': command_tup[0][0], 'command': command_tup[0][1]} args = arg_pattern.findall(tweet) if args is not None: param_dict['args'] = dict(args) return Command_Params(**param_dict) def check(self, response:Response) -> bool: """ Check if the response has a first line like:: @{bot.username}: {Command.name} Args: response: Returns: bool """ # like # "{@ # split into lines lines = str(response.data.text).split('\n') test_str = lines[0].strip() self.logger.debug(f"Testing string: {test_str}") command_tup = self.command_pattern.findall(test_str) command_tup = [c for c in command_tup if len(c) == 2 and len(c[0]) > 0 and len(c[1]) > 0] if len(command_tup) == 0: parse_res = None else: parse_res = {'username': command_tup[0][0], 'command': command_tup[0][1]} self.logger.debug(f"parse result: {parse_res}") if parse_res is None: self.logger.debug('parse match was None') return False if parse_res['username'] == self.bot.username and \ parse_res['command'] == self.name: return True else: return False def get(self, response:Response) -> Union[Command_Params, None]: """Get the value of a command Response must have the 'user_id' expansion """ matched = self.bot.client.search_recent_tweets( f"from:{response.includes['users'][0]['username']} @{self.bot.username} \"{self.name}\"", user_auth=True) self.logger.debug("matched get request") self.logger.debug(matched) if matched.data is None: return None all_params = [self.parse(d.text) for d in matched.data] found_params = [p for p in all_params if len(p.args)>0] if len(found_params)>0: return found_params[0] else: return all_params[0] def arg_dict(self, tweet:Union[str, Response]) -> dict: """ Get arguments given to this command """ if isinstance(tweet, Response): tweet = tweet.data.text return dict(self.arg_pattern.findall(tweet)) @classmethod def example(cls) -> str: ex_str = f"@ {cls.name}" keys = [f"{key}:" for key in cls.Arguments.__dataclass_fields__.keys()] ex_str = "\n".join([ex_str, *keys]) return ex_str class Identify(Command): name="identify" @dataclass class Arguments: name: str affiliation: Optional[str] orcid: Optional[str] def do(self, response:Response) -> Result: params = self.parse(response) if len(params.args) == 0: id = self.get(response) if id is None or len(id.args)==0: reply = "No Identity Found! Set one by tweeting:\n"+self.example() log = "Help message given" else: reply = "Previous Identity Found: \n" + "\n".join([f"{key}: {val}" for key, val in id.args.items()]) log = "previous identity given" else: reply = "Identity registered! This will be used to identify all threads from your username. If you need to update or remove your identity, delete this tweet, your information is not stored anywhere else." log = f"Identity registered" return Result(ok=True, reply=reply, log=log)