-
Notifications
You must be signed in to change notification settings - Fork 523
Add an Admin API endpoint to redact all a user's events #17506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
18f369d
9a7a304
2eb245b
13312d2
17d782a
81a1f2b
455198e
d32bc95
9ff860a
24afcd8
dcb156c
f8b055e
c92ca2d
22af98d
f1ac8f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add an asynchronous Admin API endpoint to redact all a user's events, and an endpoint to check on the status of that redaction task. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,20 +21,43 @@ | |
|
|
||
| import abc | ||
| import logging | ||
| from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Dict, | ||
| List, | ||
| Mapping, | ||
| Optional, | ||
| Sequence, | ||
| Set, | ||
| Tuple, | ||
| ) | ||
|
|
||
| import attr | ||
|
|
||
| from synapse.api.constants import Direction, Membership | ||
| from synapse.api.constants import Direction, EventTypes, Membership | ||
| from synapse.api.errors import SynapseError | ||
| from synapse.events import EventBase | ||
| from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo | ||
| from synapse.types import ( | ||
| JsonMapping, | ||
| Requester, | ||
| RoomStreamToken, | ||
| ScheduledTask, | ||
| StateMap, | ||
| TaskStatus, | ||
| UserID, | ||
| UserInfo, | ||
| create_requester, | ||
| ) | ||
| from synapse.visibility import filter_events_for_client | ||
|
|
||
| if TYPE_CHECKING: | ||
| from synapse.server import HomeServer | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| REDACT_ALL_EVENTS_ACTION_NAME = "redact_all_events" | ||
|
|
||
|
|
||
| class AdminHandler: | ||
| def __init__(self, hs: "HomeServer"): | ||
|
|
@@ -43,6 +66,20 @@ def __init__(self, hs: "HomeServer"): | |
| self._storage_controllers = hs.get_storage_controllers() | ||
| self._state_storage_controller = self._storage_controllers.state | ||
| self._msc3866_enabled = hs.config.experimental.msc3866.enabled | ||
| self.event_creation_handler = hs.get_event_creation_handler() | ||
| self._task_scheduler = hs.get_task_scheduler() | ||
|
|
||
| self._task_scheduler.register_action( | ||
| self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME | ||
| ) | ||
|
|
||
| async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]: | ||
| """Get the current status of an active redaction process | ||
|
|
||
| Args: | ||
| redact_id: redact_id returned by start_redact_events. | ||
| """ | ||
| return await self._task_scheduler.get_task(redact_id) | ||
|
|
||
| async def get_whois(self, user: UserID) -> JsonMapping: | ||
| connections = [] | ||
|
|
@@ -313,6 +350,152 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> | |
|
|
||
| return writer.finished() | ||
|
|
||
| async def start_redact_events( | ||
| self, | ||
| user_id: str, | ||
| rooms: list, | ||
| requester: JsonMapping, | ||
| reason: Optional[str], | ||
| limit: Optional[int], | ||
| ) -> str: | ||
| """ | ||
| Start a task redacting the events of the given user in the given rooms | ||
|
|
||
| Args: | ||
| user_id: the user ID of the user whose events should be redacted | ||
| rooms: the rooms in which to redact the user's events | ||
| requester: the user requesting the events | ||
| reason: reason for requesting the redaction, ie spam, etc | ||
| limit: limit on the number of events in each room to redact | ||
|
|
||
| Returns: | ||
| a unique ID which can be used to query the status of the task | ||
| """ | ||
| active_tasks = await self._task_scheduler.get_tasks( | ||
| actions=[REDACT_ALL_EVENTS_ACTION_NAME], | ||
| resource_id=user_id, | ||
| statuses=[TaskStatus.ACTIVE], | ||
| ) | ||
|
|
||
| if len(active_tasks) > 0: | ||
| raise SynapseError( | ||
| 400, "Redact already in progress for user %s" % (user_id,) | ||
| ) | ||
|
|
||
| redact_id = await self._task_scheduler.schedule_task( | ||
| REDACT_ALL_EVENTS_ACTION_NAME, | ||
| resource_id=user_id, | ||
| params={ | ||
| "rooms": rooms, | ||
| "requester": requester, | ||
| "user_id": user_id, | ||
| "reason": reason, | ||
| "limit": limit, | ||
| }, | ||
| ) | ||
|
|
||
| logger.info( | ||
| "starting redact events with redact_id %s", | ||
| redact_id, | ||
| ) | ||
|
|
||
| return redact_id | ||
|
|
||
| async def _redact_all_events( | ||
| self, task: ScheduledTask | ||
| ) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]: | ||
| """ | ||
| Task to redact all of a users events in the given rooms, tracking which, if any, events | ||
| whose redaction failed | ||
| """ | ||
|
|
||
| assert task.params is not None | ||
| rooms = task.params.get("rooms") | ||
| assert rooms is not None | ||
|
|
||
| r = task.params.get("requester") | ||
| assert r is not None | ||
| admin = Requester.deserialize(self._store, r) | ||
|
|
||
| user_id = task.params.get("user_id") | ||
| assert user_id is not None | ||
|
|
||
| requester = create_requester( | ||
| user_id, authenticated_entity=admin.user.to_string() | ||
| ) | ||
|
|
||
| reason = task.params.get("reason") | ||
| limit = task.params.get("limit") | ||
|
H-Shay marked this conversation as resolved.
|
||
|
|
||
| if not limit: | ||
| limit = 1000 | ||
|
H-Shay marked this conversation as resolved.
Outdated
|
||
|
|
||
| result: Mapping[str, Any] = ( | ||
| task.result if task.result else {"failed_redactions": {}} | ||
| ) | ||
| for room in rooms: | ||
| room_version = await self._store.get_room_version(room) | ||
| event_ids = await self._store.get_events_sent_by_user_in_room( | ||
| user_id, | ||
| room, | ||
| limit, | ||
| ["m.room.member", "m.room.message"], | ||
| ) | ||
| if not event_ids: | ||
| # there's nothing to redact | ||
| return TaskStatus.COMPLETE, result, None | ||
|
|
||
| events = await self._store.get_events_as_list(event_ids) | ||
| for event in events: | ||
| # we care about join events but not other membership events | ||
| if event.type == "m.room.member": | ||
| content = event.content | ||
| if content: | ||
| if content.get("membership") == "Membership.JOIN": | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh err,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right I have switched it back and ensured that the tests are verifying that we are redacting the join event.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for updating the tests! Using |
||
| pass | ||
| else: | ||
| continue | ||
| relations = await self._store.get_relations_for_event( | ||
| room, event.event_id, event, event_type=EventTypes.Redaction | ||
| ) | ||
|
|
||
| # if we've already successfully redacted this event then skip processing it | ||
| if relations[0]: | ||
| continue | ||
|
|
||
| event_dict = { | ||
| "type": EventTypes.Redaction, | ||
| "content": {"reason": reason} if reason else {}, | ||
| "room_id": room, | ||
| "sender": user_id, | ||
| } | ||
| if room_version.updated_redaction_rules: | ||
| event_dict["content"]["redacts"] = event.event_id | ||
| else: | ||
| event_dict["redacts"] = event.event_id | ||
|
|
||
| try: | ||
| # set the prev event to the offending message to allow for redactions | ||
| # to be processed in the case where the user has been kicked/banned before | ||
| # redactions are requested | ||
| ( | ||
| redaction, | ||
| _, | ||
| ) = await self.event_creation_handler.create_and_send_nonmember_event( | ||
| requester, | ||
| event_dict, | ||
| prev_event_ids=[event.event_id], | ||
| ratelimit=False, | ||
| ) | ||
| except Exception as ex: | ||
| logger.info( | ||
| f"Redaction of event {event.event_id} failed due to: {ex}" | ||
| ) | ||
| result["failed_redactions"][event.event_id] = str(ex) | ||
| await self._task_scheduler.update_task(task.id, result=result) | ||
|
|
||
| return TaskStatus.COMPLETE, result, None | ||
|
|
||
|
|
||
| class ExfiltrationWriter(metaclass=abc.ABCMeta): | ||
| """Interface used to specify how to write exported data.""" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.