import logging import random import re import time from nio import ( AsyncClient, MatrixRoom, MegolmEvent, RoomCreateError, RoomGetEventError, RoomInviteError, RoomMessagesError, RoomMessageText, RoomPutStateResponse, RoomSendError, UnknownEvent, ) from vetting_bot.chat_functions import react_to_event, send_text_to_room from vetting_bot.config import Config from vetting_bot.storage import Storage from vetting_bot.timer import Timer logger = logging.getLogger(__name__) class Command: def __init__( self, client: AsyncClient, store: Storage, config: Config, command: str, room: MatrixRoom, event: RoomMessageText, ): """A command made by a user. Args: client: The client to communicate to matrix with. store: Bot storage. config: Bot configuration parameters. command: The command and arguments. room: The room the command was sent in. event: The event describing the command. """ self.client = client self.store = store self.config = config self.command = command self.room = room self.event = event self.args = self.command.split()[1:] logger.info("Running command `%s` because of %s", command, event.sender) async def process(self): """Process the command""" if self.command.startswith("echo"): await self._echo() elif self.command.startswith("react"): await self._react() elif self.command.startswith("help"): await self._show_help() elif self.command.startswith("start"): await self._start_vetting() elif self.command.startswith("vote"): await self._start_vote() elif self.command.startswith("results"): await self._get_poll_results() else: await self._unknown_command() async def _echo(self): """Echo back the command's arguments""" response = " ".join(self.args) await send_text_to_room(self.client, self.room.room_id, response) async def _react(self): """Make the bot react to the command message""" # React with a start emoji reaction = "⭐" await react_to_event( self.client, self.room.room_id, self.event.event_id, reaction ) # React with some generic text reaction = "Some text" await react_to_event( self.client, self.room.room_id, self.event.event_id, reaction ) async def _show_help(self): """Show the help text""" if not self.args: text = ( "Hello, I am the vetting bot! Use `help commands` to view " "available commands." ) await send_text_to_room(self.client, self.room.room_id, text) return topic = self.args[0] if topic == "rules": text = "These are the rules!" elif topic == "commands": text = ( "Available commands: \n" "`start {user_id}`\n" "`vote {user_id}`\n" "`results {poll_event_id}`\n" ) else: text = "Unknown help topic!" await send_text_to_room(self.client, self.room.room_id, text) async def _start_vetting(self): """Starts the vetting process""" if self.room.room_id != self.config.vetting_room_id: text = f"This command can only be used in https://matrix.to/#/{self.config.vetting_room_id} !" await send_text_to_room(self.client, self.room.room_id, text) return if not self.args: text = "Usage: `start {user_id}\nExample: `start @someone:example.com`" await send_text_to_room(self.client, self.room.room_id, text) return vetted_user_id = self.args[0] if not validate_user_id(vetted_user_id): text = ( "The entered user id is invalid. " f"It should be in the format of `{self.client.user_id}`" ) await send_text_to_room(self.client, self.room.room_id, text) return # Check if vetting room already exists for user self.store.cursor.execute( "SELECT room_id FROM vetting WHERE mxid=?", (vetted_user_id,) ) row = self.store.cursor.fetchone() if row is not None: logger.warn("Vetting room already exists for %s", vetted_user_id) text = f"A vetting room already exists for this user: https://matrix.to/#/{row[0]}" await send_text_to_room(self.client, self.room.room_id, text) return logger.info("Creating vetting room for %s", vetted_user_id) # Get members to invite invitees = set( [ user.user_id for user in self.room.users.values() if user.power_level >= self.config.power_level_invite ] ) invitees.add(vetted_user_id) # Invite user to vet invitees.add(self.event.sender) # Invite user that sent the command invitees.remove(self.client.user_id) # Create new room random_string = hex(random.randrange(4096, 65535))[2:].upper() initial_state = [ # Enable encryption { "type": "m.room.encryption", "content": {"algorithm": "m.megolm.v1.aes-sha2"}, "state_key": "", }, # Make room joinable by federation members { "type": "m.room.join_rules", "state_key": "", "content": { "join_rule": "restricted", "allow": [ { "room_id": self.config.main_space_id, "type": "m.room_membership", }, { "room_id": self.config.vetting_space_id, "type": "m.room_membership", }, ], }, }, # Show message history to new members { "type": "m.room.history_visibility", "state_key": "", "content": {"history_visibility": "shared"}, }, ] # Max amount of users to invite is 10 here: invitee_chunks = divide_chunks(invitees, 10) room_resp = await self.client.room_create( name=f"Vetting {random_string}", invite=next(invitee_chunks), initial_state=initial_state, ) if isinstance(room_resp, RoomCreateError): text = f"Unable to create room: {room_resp}" await send_text_to_room(self.client, self.room.room_id, text) logging.error(text, stack_info=True) return # Invite more people for chunk in invitee_chunks: for mxid in chunk: invite_resp = await self.client.room_invite(room_resp.room_id, mxid) if isinstance(invite_resp, RoomInviteError): text = ( f"Unable to invite {mxid} to newly created room: {invite_resp}" ) await send_text_to_room(self.client, self.room.room_id, text) logging.error(text, stack_info=True) # Create new vetting entry self.store.cursor.execute( "INSERT INTO vetting (mxid, room_id, vetting_create_time) VALUES (?, ?, ?)", (vetted_user_id, room_resp.room_id, time.time()), ) logger.debug("Adding vetting room to space") # Add newly created room to space space_child_content = { "suggested": False, "via": [self.client.server], } space_resp = await self.client.room_put_state( room_id=self.config.vetting_space_id, event_type="m.space.child", content=space_child_content, state_key=room_resp.room_id, ) if not isinstance(space_resp, RoomPutStateResponse): logging.error("Failed to add room to space: %s", space_resp, exc_info=True) vetted_user_server = vetted_user_id.split(":", maxsplit=1)[1] vetting_room_link = f"https://matrix.to/#/{room_resp.room_id}?via={self.client.server}&via={vetted_user_server}" text = f"Created vetting room for https://matrix.to/#/{vetted_user_id}: {vetting_room_link}" await send_text_to_room(self.client, self.room.room_id, text) logger.info("Vetting room set up for %s", vetted_user_id) async def _start_vote(self): """Starts the vote""" if self.room.room_id != self.config.vetting_room_id: text = f"This command can only be used in https://matrix.to/#/{self.config.vetting_room_id} !" await send_text_to_room(self.client, self.room.room_id, text) return if not self.args: text = "Usage: `vote {user_id}\nExample: `vote @someone:example.com`" await send_text_to_room(self.client, self.room.room_id, text) return vetted_user_id = self.args[0] # Check if vetting room exists for user and poll hasn't been started yet self.store.cursor.execute( "SELECT room_id, poll_event_id, room_id FROM vetting WHERE mxid=?", (vetted_user_id,), ) row = self.store.cursor.fetchone() if row is None: text = "This user hasn't been vetted, can't vote on them!" await send_text_to_room(self.client, self.room.room_id, text) return if row[1] is not None: event_link = f"https://matrix.to/#/{self.config.vetting_room_id}/{row[1]}?via={self.client.server}" text = f"A poll has already been started for this user: {event_link}" await send_text_to_room(self.client, self.room.room_id, text) return vetting_room_id = row[2] poll_text = f"Accept {vetted_user_id} into the Federation?" choices = ["yes", "no", "blank"] choices_text = "".join( [ f"\n{i}. {choice.title()}" for choice, i in zip(choices, range(1, len(choices) + 1)) ] ) answers = [ {"id": choice, "org.matrix.msc1767.text": choice.title()} for choice in choices ] event_content = { "org.matrix.msc1767.text": f"{poll_text}{choices_text}", "org.matrix.msc3381.poll.start": { "kind": "org.matrix.msc3381.poll.disclosed", "max_selections": 1, "question": { "org.matrix.msc1767.text": poll_text, }, "answers": answers, }, } poll_resp = await self.client.room_send( self.room.room_id, message_type="org.matrix.msc3381.poll.start", content=event_content, ignore_unverified_devices=True, ) if isinstance(poll_resp, RoomSendError): logging.error(poll_resp, stack_info=True) text = f"Failed to send poll: {poll_resp}" await send_text_to_room(self.client, self.room.room_id, text) return voting_start_time = time.time() self.store.cursor.execute( "UPDATE vetting SET poll_event_id = ?, voting_start_time = ? WHERE mxid = ?", (poll_resp.event_id, voting_start_time, vetted_user_id), ) timer = Timer(self.client, self.store, self.config) await timer.wait_for_poll_end( vetted_user_id, poll_resp.event_id, voting_start_time ) # Send link to vetting room vetted_user_server = vetted_user_id.split(":", maxsplit=1)[1] vetting_room_link = f"https://matrix.to/#/{vetting_room_id}?via={self.client.server}&via={vetted_user_server}" msg_content = { "m.relates_to": {"rel_type": "m.thread", "event_id": poll_resp.event_id}, "msgtype": "m.text", "body": f"Vetting room: {vetting_room_link}", } msg_resp = await self.client.room_send( self.room.room_id, message_type="m.room.message", content=msg_content, ignore_unverified_devices=True, ) if isinstance(msg_resp, RoomSendError): logging.error(msg_resp, stack_info=True) text = f"Failed to send vetting room link: {msg_resp}" await send_text_to_room(self.client, self.room.room_id, text) return async def _get_poll_results(self): poll_event_id = self.args[0] # Get answer ID lookup poll_event_resp = await self.client.room_get_event( room_id=self.room.room_id, event_id=poll_event_id ) if isinstance(poll_event_resp, RoomGetEventError): logging.error(poll_event_resp, stack_info=True) text = f"Unable to get poll event: {poll_event_resp}" await send_text_to_room(self.client, self.room.room_id, text) return event = poll_event_resp.event if isinstance(event, MegolmEvent): try: event = self.client.decrypt_event(event) except Exception as e: text = f"Could not decrypt event: {e}" logger.error(text) await send_text_to_room(self.client, self.room.room_id, text) return logger.debug("Event source: %s", event.source) poll_body = event.source.get("content") if "org.matrix.msc3381.poll.start" in poll_body: answer_ids = { answer["id"]: answer["org.matrix.msc1767.text"] for answer in poll_body["org.matrix.msc3381.poll.start"]["answers"] } # elif "org.matrix.msc3381.poll" in poll_body: # Newer version of polls # answer_ids = { # answer["org.matrix.msc3381.id"]: answer["org.matrix.msc1767.text"][0]["body"] # for answer in poll_body["org.matrix.msc3381.poll"]["answers"] # } else: logging.error(f"Not a poll event: {poll_event_resp}; {poll_body}") text = f"This isn't a poll event!" await send_text_to_room(self.client, self.room.room_id, text) return # Gather votes # TODO: DRY message_filter = { "rooms": [self.room.room_id], } vote_count = {} users_voted = set() user_votes = {} # Loop until we find all events that could be related to the poll # (max 20 times: 20 * 20 = up to 400 events deep or until we find the poll event) start_token = "" for _ in range(0, 20): logger.debug("Requesting events") message_resp = await self.client.room_messages( room_id=self.room.room_id, start=start_token, limit=20, message_filter=message_filter, ) if isinstance(message_resp, RoomMessagesError): logging.error(message_resp, stack_info=True) text = "Unable to gather votes." await send_text_to_room(self.client, self.room.room_id, text) return # Resume next request where this ends start_token = message_resp.end # Count votes for event in message_resp.chunk: # Only process poll response events if not isinstance(event, UnknownEvent): continue if event.type != "org.matrix.msc3381.poll.response": continue content = event.source.get("content") try: # Check if this response is for the correct poll related_event_id = content["m.relates_to"]["event_id"] if related_event_id != poll_event_id: continue # Add vote to count answer = content["org.matrix.msc3381.poll.response"]["answers"][0] # Only count the last poll response event if event.sender in users_voted: continue users_voted.add(event.sender) if answer not in vote_count: vote_count[answer] = 0 vote_count[answer] += 1 sender = event.source.get("sender") user_votes[sender] = answer except KeyError: pass # Check if we found the initial poll event if any([event.event_id == poll_event_id for event in message_resp.chunk]): break poll_link = f"https://matrix.to/#/{self.room.room_id}/{poll_event_id}" text = f"Results for {poll_link}:
" + "
".join( [ f"{voter} voted for _{answer_ids[answer_id]}_ (`{answer_id}`)" for voter, answer_id in user_votes.items() ] ) await send_text_to_room(self.client, self.room.room_id, text) async def _unknown_command(self): await send_text_to_room( self.client, self.room.room_id, f"Unknown command '{self.command}'. Try the 'help' command for more information.", ) def validate_user_id(user_id): return ( re.match( ( r"^@[!-9;-~]*:" r"((\d{1,3}\.){3}\d{1,3}|\[[0-9A-Fa-f:.]{2,45}\]|[0-9A-Za-z.-]{1,255})(:\d{1,5})?$" ), user_id, ) is not None ) # Yield successive n-sized chunks from l. def divide_chunks(l, n: int): # looping till length l for i in range(0, len(l), n): yield l[i : i + n]