Skip to content
1 change: 1 addition & 0 deletions changelog.d/17506.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an asynchronous Admin API endpoint to redact all a user's events, and an endpoint to check on the status of that redaction task.
Comment thread
H-Shay marked this conversation as resolved.
Outdated
70 changes: 70 additions & 0 deletions docs/admin_api/user_admin_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1361,3 +1361,73 @@ Returns a `404` HTTP status code if no user was found, with a response body like
```

_Added in Synapse 1.72.0._


## Redact all the events of a user

The API is
```
POST /_synapse/admin/v1/user/$user_id/redact

{
Comment thread
H-Shay marked this conversation as resolved.
"rooms": [!roomid1, !roomid2]
Comment thread
H-Shay marked this conversation as resolved.
Outdated
}
```
If an empty dict is provided as the key for `rooms`, all events in all the rooms the user is member of will be redacted,
Comment thread
H-Shay marked this conversation as resolved.
Outdated
otherwise all the events in the rooms provided in the request will be redacted.

The API starts redaction process running, and returns immediately with a JSON body with
a redact id which can be used to query the status of the redaction process:

```json
{
"redact_id": "<opaque id>"
}
```

**Parameters**

The following parameters should be set in the URL:

- `user_id` - The fully qualified MXID of the user: for example, `@user:server.com`.

The following JSON body parameter must be provided:

- `rooms` - A list of rooms to redact the user's events in, if an empty list is provided all events in all rooms
Comment thread
H-Shay marked this conversation as resolved.
Outdated
the user is a member of will be redacted

Comment thread
H-Shay marked this conversation as resolved.

## Check the status of a redaction process

It is possible to query the status of the background task for redacting a user's events.
The status can be queried up to 24 hours after completion of the task,
or until Synapse is restarted (whichever happens first).

The API is:

```
GET /_synapse/admin/v1/user/redact_status/$redact_id
```

A response body like the following is returned:

```
{
"status": "active",
"failed_redactions": [],
Comment thread
H-Shay marked this conversation as resolved.
}
```

**Parameters**

The following parameters should be set in the URL:

* `redact_id` - The ID for this redaction, provided when the redaction was requested.
Comment thread
H-Shay marked this conversation as resolved.
Outdated


**Response**

The following fields are returned in the JSON response body:

- status: one of scheduled/active/completed/failed, indicating the status of the redaction job
- failed: a list of event ids the process was unable to redact, if any
Comment thread
H-Shay marked this conversation as resolved.
Outdated
Comment thread
H-Shay marked this conversation as resolved.
Outdated
124 changes: 121 additions & 3 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,42 @@

import abc
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
)

import attr

from synapse.api.constants import Direction, Membership
from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
from synapse.types import (
JsonMapping,
Requester,
RoomStreamToken,
ScheduledTask,
StateMap,
TaskStatus,
UserID,
UserInfo,
)
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

REDACT_ALL_EVENTS_ACTION_NAME = "redact_all_events"


class AdminHandler:
def __init__(self, hs: "HomeServer"):
Expand All @@ -43,6 +65,20 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
self.event_creation_handler = hs.get_event_creation_handler()
self._task_scheduler = hs.get_task_scheduler()

self._task_scheduler.register_action(
self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME
)

async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active redaction process

Args:
redact_id: redact_id returned by start_redact_events.
"""
return await self._task_scheduler.get_task(redact_id)

async def get_whois(self, user: UserID) -> JsonMapping:
connections = []
Expand Down Expand Up @@ -305,6 +341,88 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->

return writer.finished()

async def start_redact_events(
self, user_id: str, rooms: list, requester: JsonMapping
) -> str:
"""
Start a task redacting the events of the given user in the givent rooms
Comment thread
H-Shay marked this conversation as resolved.
Outdated

Args:
user_id: the user ID of the user whose events should be redacted
rooms: the rooms in which to redact the user's events
requester: the user requesting the events
"""
active_tasks = await self._task_scheduler.get_tasks(
actions=[REDACT_ALL_EVENTS_ACTION_NAME],
resource_id=user_id,
statuses=[TaskStatus.ACTIVE],
)

if len(active_tasks) > 0:
raise SynapseError(
400, "Redact already in progress for user %s" % (user_id,)
)

redact_id = await self._task_scheduler.schedule_task(
REDACT_ALL_EVENTS_ACTION_NAME,
resource_id=user_id,
params={"rooms": rooms, "requester": requester, "user_id": user_id},
)

logger.info(
"starting redact events with redact_id %s",
redact_id,
)

return redact_id

async def _redact_all_events(
self, task: ScheduledTask
) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]:
"""
Task to redact all a users events in the given rooms, tracking which, if any, events
Comment thread
H-Shay marked this conversation as resolved.
Outdated
whose redaction failed
"""

assert task.params is not None
rooms = task.params.get("rooms")
assert rooms is not None

r = task.params.get("requester")
assert r is not None
requester = Requester.deserialize(self._store, r)

user_id = task.params.get("user_id")
assert user_id is not None

result: Dict[str, Any] = {"result": []}
Comment thread
H-Shay marked this conversation as resolved.
Outdated
for room in rooms:
room_version = await self._store.get_room_version(room)
events = await self._store.get_events_sent_by_user(user_id, room)

for event in events:
event_dict = {
"type": EventTypes.Redaction,
"content": {},
"room_id": room,
"sender": requester.user.to_string(),
}
if room_version.updated_redaction_rules:
event_dict["content"]["redacts"] = event[0]
else:
event_dict["redacts"] = event[0]

try:
await self.event_creation_handler.create_and_send_nonmember_event(
Comment thread
anoadragon453 marked this conversation as resolved.
Outdated
requester, event_dict
)
except Exception as ex:
logger.info(f"Redaction of event {event[0]} failed due to: {ex}")
result["result"].append(event[0])
Comment thread
H-Shay marked this conversation as resolved.
Outdated
await self._task_scheduler.update_task(task.id, result=result)

return TaskStatus.COMPLETE, result, None


class ExfiltrationWriter(metaclass=abc.ABCMeta):
"""Interface used to specify how to write exported data."""
Expand Down
4 changes: 4 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
DeactivateAccountRestServlet,
PushersRestServlet,
RateLimitRestServlet,
RedactUser,
RedactUserStatus,
ResetPasswordRestServlet,
SearchUsersRestServlet,
ShadowBanRestServlet,
Expand Down Expand Up @@ -319,6 +321,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
UserByExternalId(hs).register(http_server)
UserByThreePid(hs).register(http_server)
RedactUser(hs).register(http_server)
RedactUserStatus(hs).register(http_server)

DeviceRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
Expand Down
62 changes: 62 additions & 0 deletions synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -1410,3 +1410,65 @@ async def on_GET(
raise NotFoundError("User not found")

return HTTPStatus.OK, {"user_id": user_id}


class RedactUser(RestServlet):
"""
Redact all the events of a given user in the given rooms or if empty dict is provided
then all events in all rooms user is member of. Kicks off a background process and
returns an id that can be used to check on the progress of the redaction progress
"""

PATTERNS = admin_patterns("/user/(?P<user_id>[^/]*)/redact")

def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self.admin_handler = hs.get_admin_handler()

async def on_POST(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)
Comment thread
anoadragon453 marked this conversation as resolved.

body = parse_json_object_from_request(request, allow_empty_body=True)
rooms = body["rooms"]
Comment thread
H-Shay marked this conversation as resolved.
Outdated

if rooms == {}:
Comment thread
H-Shay marked this conversation as resolved.
Outdated
rooms = await self._store.get_rooms_for_user(user_id)

redact_id = await self.admin_handler.start_redact_events(
user_id, list(rooms), requester.serialize()
)

return HTTPStatus.OK, {"redact_id": redact_id}


class RedactUserStatus(RestServlet):
"""
Check on the progress of the redaction request represented by the provided ID, returning
the status of the process and a list of events that were unable to be redacted, if any
"""

PATTERNS = admin_patterns("/user/redact_status/(?P<redact_id>[^/]*)$")

def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self.admin_handler = hs.get_admin_handler()

async def on_GET(
self, request: SynapseRequest, redact_id: str
) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self._auth, request)

task = await self.admin_handler.get_redact_task(redact_id)

if task:
assert task.result is not None
return HTTPStatus.OK, {
"status": task.status,
"failed_redactions": task.result["result"],
Comment thread
H-Shay marked this conversation as resolved.
Outdated
}
else:
raise NotFoundError("redact id '%s' not found" % redact_id)
21 changes: 21 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2439,3 +2439,24 @@ def mark_event_rejected_txn(
)

self.invalidate_get_event_cache_after_txn(txn, event_id)

async def get_events_sent_by_user(self, user_id: str, room_id: str) -> List[tuple]:
"""
Get a list of event ids of events sent by user in room
"""

def _get_events_by_user_txn(
Comment thread
H-Shay marked this conversation as resolved.
Outdated
txn: LoggingTransaction, user_id: str, room_id: str
) -> List[tuple]:
return self.db_pool.simple_select_many_txn(
txn,
"events",
Comment thread
anoadragon453 marked this conversation as resolved.
Outdated
"sender",
Comment thread
H-Shay marked this conversation as resolved.
Outdated
[user_id],
{"room_id": room_id},
retcols=["event_id"],
)

return await self.db_pool.runInteraction(
"get_events_by_user", _get_events_by_user_txn, user_id, room_id
)
Comment thread
anoadragon453 marked this conversation as resolved.
Outdated
Loading