Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export class MarkNotificationsAsSeen {
const updatedMessages: MessageEntity[] = [];
// If notificationIds are provided, use them; otherwise use filters
if (notificationIds && notificationIds.length > 0) {
const BATCH_SIZE = 50;
const BATCH_SIZE = 500;
const notificationIdChunks = this.chunkArray(notificationIds, BATCH_SIZE);

for (const idChunk of notificationIdChunks) {
Expand Down
8 changes: 6 additions & 2 deletions libs/dal/src/repositories/base-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,12 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement> {
});
}

async bulkWrite(bulkOperations: any, ordered = false): Promise<any> {
return await this.MongooseModel.bulkWrite(bulkOperations, { ordered });
async bulkWrite(
bulkOperations: any,
ordered = false,
options?: Pick<mongo.BulkWriteOptions, 'writeConcern' | 'session'>
): Promise<any> {
return await this.MongooseModel.bulkWrite(bulkOperations, { ordered, ...options });
}

protected mapEntity<TData>(data: TData): TData extends null ? null : T_MappedEntity {
Expand Down
28 changes: 20 additions & 8 deletions libs/dal/src/repositories/message/message.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ function mergeTagsMongoFragment<MessageQueryT extends MessageQuery & EnforceEnvI

export class MessageRepository extends BaseRepository<MessageDBModel, MessageEntity, EnforceEnvId> {
private static readonly BATCH_SIZE = 100;
/** Larger chunks for status updates: pairs of updateMany are merged via bulkWrite per chunk. */
private static readonly STATUS_UPDATE_CHUNK_SIZE = 1000;
private feedRepository = new FeedRepository();
constructor() {
super(Message, MessageEntity);
Expand Down Expand Up @@ -618,8 +620,7 @@ export class MessageRepository extends BaseRepository<MessageDBModel, MessageEnt
// Extract IDs for targeted update
const documentIds = documentsToUpdate.map((doc) => doc._id);

// Perform the update using document IDs in batches
const chunks = this.chunkArray(documentIds);
const chunks = this.chunkArray(documentIds, MessageRepository.STATUS_UPDATE_CHUNK_SIZE);

for (const chunk of chunks) {
await this.update(
Expand Down Expand Up @@ -934,17 +935,28 @@ export class MessageRepository extends BaseRepository<MessageDBModel, MessageEnt
// Handle firstSeenDate logic separately for operations that mark as seen
const shouldMarkAsSeen = isUpdatingArchived || isUpdatingRead || (isUpdatingSeen && seen) || isUpdatingSnoozed;

// Batch the updates
const chunks = this.chunkArray(documentIds);
const chunks = this.chunkArray(documentIds, MessageRepository.STATUS_UPDATE_CHUNK_SIZE);

for (const chunk of chunks) {
const chunkQuery = { _id: { $in: chunk }, _environmentId: query._environmentId };

if (shouldMarkAsSeen) {
await this.update(chunkQuery, { $set: updatePayload }, { writeConcern: { w: 1 } });
await this.update(
{ ...chunkQuery, firstSeenDate: { $exists: false } },
{ $set: { firstSeenDate: new Date() } },
await this.bulkWrite(
[
{
updateMany: {
filter: chunkQuery,
update: { $set: updatePayload },
},
},
{
updateMany: {
filter: { ...chunkQuery, firstSeenDate: { $exists: false } },
update: { $set: { firstSeenDate: new Date() } },
},
},
],
true,
{ writeConcern: { w: 1 } }
);
} else {
Expand Down
Loading