Skip to content
1 change: 0 additions & 1 deletion docs/admin_api/user_admin_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,5 @@ The following fields are returned in the JSON response body:
- status: one of scheduled/active/completed/failed, indicating the status of the redaction job
- failed_redactions: a dict where the keys are event ids the process was unable to redact, if any, and the values are
the corresponding error that caused the redaction to fail
- successful_redactions: a list of event ids that were successfully redacted

_Added in Synapse 1.114.0._
Comment thread
H-Shay marked this conversation as resolved.
Outdated
61 changes: 46 additions & 15 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
TaskStatus,
UserID,
UserInfo,
create_requester,
)
from synapse.visibility import filter_events_for_client

Expand Down Expand Up @@ -406,32 +407,61 @@ async def _redact_all_events(

r = task.params.get("requester")
assert r is not None
requester = Requester.deserialize(self._store, r)
admin = Requester.deserialize(self._store, r)

user_id = task.params.get("user_id")
assert user_id is not None

requester = create_requester(
user_id, authenticated_entity=admin.user.to_string()
)

reason = task.params.get("reason")
limit = task.params.get("limit")
Comment thread
H-Shay marked this conversation as resolved.

if not limit:
limit = 1000
Comment thread
H-Shay marked this conversation as resolved.
Outdated

result: Mapping[str, Any] = (
task.result
if task.result
else {"failed_redactions": {}, "successful_redactions": []}
task.result if task.result else {"failed_redactions": {}}
)
for room in rooms:
room_version = await self._store.get_room_version(room)
events = await self._store.get_events_sent_by_user_in_room(
user_id, room, limit, "m.room.redaction"
event_ids = await self._store.get_events_sent_by_user_in_room(
user_id,
room,
limit,
[
"m.room.member",
"m.text",
"m.emote",
"m.image",
"m.file",
"m.audio",
"m.video",
Comment thread
H-Shay marked this conversation as resolved.
Outdated
],
)

if not events:
if not event_ids:
# there's nothing to redact
return TaskStatus.COMPLETE, result, None

events = await self._store.get_events_as_list(set(event_ids))
Comment thread
H-Shay marked this conversation as resolved.
Outdated
for event in events:
# we care about join events but not other membership events
if event.type == "m.room.member":
dict = event.get_dict()
content = dict.get("content")
Comment thread
H-Shay marked this conversation as resolved.
Outdated
if content:
if content.get("membership") == "join":
Comment thread
H-Shay marked this conversation as resolved.
Outdated
pass
else:
continue
relations = await self._store.get_relations_for_event(
room, event.event_id, event, event_type=EventTypes.Redaction
)

# if we've already successfully redacted this event then skip processing it
if event in result["successful_redactions"]:
if relations[0]:
continue

event_dict = {
Expand All @@ -441,9 +471,9 @@ async def _redact_all_events(
"sender": user_id,
}
if room_version.updated_redaction_rules:
event_dict["content"]["redacts"] = event
event_dict["content"]["redacts"] = event.event_id
else:
event_dict["redacts"] = event
event_dict["redacts"] = event.event_id

try:
# set the prev event to the offending message to allow for redactions
Expand All @@ -453,14 +483,15 @@ async def _redact_all_events(
await self.event_creation_handler.create_and_send_nonmember_event(
requester,
Comment thread
anoadragon453 marked this conversation as resolved.
Outdated
event_dict,
prev_event_ids=[event],
prev_event_ids=[event.event_id],
ratelimit=False,
)
)
result["successful_redactions"].append(event)
except Exception as ex:
logger.info(f"Redaction of event {event} failed due to: {ex}")
result["failed_redactions"][event] = str(ex)
logger.info(
f"Redaction of event {event.event_id} failed due to: {ex}"
)
result["failed_redactions"][event.event_id] = str(ex)
await self._task_scheduler.update_task(task.id, result=result)

return TaskStatus.COMPLETE, result, None
Expand Down
4 changes: 0 additions & 4 deletions synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,13 +1475,9 @@ async def on_GET(
elif task.status == TaskStatus.COMPLETE:
assert task.result is not None
failed_redactions = task.result.get("failed_redactions")
successful_redactions = task.result.get("successful_redactions")
return HTTPStatus.OK, {
"status": TaskStatus.COMPLETE,
"failed_redactions": failed_redactions if failed_redactions else {},
"successful_redactions": (
successful_redactions if successful_redactions else []
),
}
elif task.status == TaskStatus.SCHEDULED:
return HTTPStatus.OK, {"status": TaskStatus.SCHEDULED}
Expand Down
53 changes: 33 additions & 20 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2441,36 +2441,52 @@ def mark_event_rejected_txn(
self.invalidate_get_event_cache_after_txn(txn, event_id)

async def get_events_sent_by_user_in_room(
self, user_id: str, room_id: str, limit: Optional[int], filter: str = "none"
self, user_id: str, room_id: str, limit: int, filter: Optional[List[str]] = None
) -> Optional[List[str]]:
"""
Get a list of event ids and event info of events sent by the user in the specified room
Get a list of event ids of events sent by the user in the specified room

Args:
user_id: user ID to search against
room_id: room ID of the room to search for events in
filter: type of event to filter out
filter: type of events to filter for
limit: maximum number of event ids to return
"""

def _get_events_by_user_txn(
def _get_events_by_user_in_room_txn(
txn: LoggingTransaction,
user_id: str,
room_id: str,
filter: Optional[str],
filter: Optional[List[str]],
batch_size: int,
offset: int,
) -> Tuple[Optional[List[str]], int]:

sql = """
SELECT event_id FROM events
WHERE sender = ? AND room_id = ?
AND type != ?
ORDER BY received_ts DESC
LIMIT ?
OFFSET ?
"""
txn.execute(sql, (user_id, room_id, filter, batch_size, offset))
if filter:
filter_sql = " AND type in ("
for i, _ in enumerate(filter):
if i < len(filter) - 1:
filter_sql += "?, "
else:
filter_sql += "?)"
Comment thread
H-Shay marked this conversation as resolved.
Outdated

sql = f"""
SELECT event_id FROM events
WHERE sender = ? AND room_id = ?
{filter_sql}
ORDER BY received_ts DESC
LIMIT ?
OFFSET ?
"""
txn.execute(sql, (user_id, room_id, *filter, batch_size, offset))
else:
sql = """
SELECT event_id FROM events
WHERE sender = ? AND room_id = ?
ORDER BY received_ts DESC
LIMIT ?
OFFSET ?
"""
Comment thread
H-Shay marked this conversation as resolved.
Outdated
txn.execute(sql, (user_id, room_id, batch_size, offset))
res = txn.fetchall()
if res:
events = [row[0] for row in res]
Expand All @@ -2479,9 +2495,6 @@ def _get_events_by_user_txn(

return events, offset + batch_size

if not limit:
limit = 1000

offset = 0
batch_size = 100
if batch_size < limit:
Comment thread
H-Shay marked this conversation as resolved.
Outdated
Expand All @@ -2491,7 +2504,7 @@ def _get_events_by_user_txn(
while offset < limit:
res, offset = await self.db_pool.runInteraction(
"get_events_by_user",
Comment thread
H-Shay marked this conversation as resolved.
_get_events_by_user_txn,
_get_events_by_user_in_room_txn,
user_id,
room_id,
filter,
Expand All @@ -2501,5 +2514,5 @@ def _get_events_by_user_txn(
if res:
selected_ids = selected_ids + res
else:
return selected_ids
break
return selected_ids