Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 171 additions & 1 deletion apps/api/src/app/agents/services/agent-conversation.service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Injectable } from '@nestjs/common';
import { BadRequestException, Injectable } from '@nestjs/common';
import { PinoLogger, shortId } from '@novu/application-generic';
import {
ConversationActivityEntity,
ConversationActivityRepository,
ConversationActivitySenderTypeEnum,
ConversationActivityTypeEnum,
ConversationChannel,
ConversationEntity,
ConversationParticipantTypeEnum,
ConversationRepository,
Expand Down Expand Up @@ -38,6 +40,32 @@ export interface PersistInboundMessageParams {
organizationId: string;
}

export interface ConversationActivityContext {
conversationId: string;
channel: ConversationChannel;
agentIdentifier: string;
environmentId: string;
organizationId: string;
}

export interface PersistAgentActivityParams extends ConversationActivityContext {
platformMessageId: string;
/** Overrides channel.platformThreadId when delivery returns a different thread ID */
platformThreadId?: string;
agentName?: string;
content: string;
richContent?: Record<string, unknown>;
}

export interface UpdateMetadataParams extends ConversationActivityContext {
currentMetadata: Record<string, unknown>;
signals: Array<{ key: string; value: unknown }>;
}

export interface ResolveConversationParams extends ConversationActivityContext {
summary?: string;
}

@Injectable()
export class AgentConversationService {
constructor(
Expand All @@ -46,6 +74,15 @@ export class AgentConversationService {
private readonly logger: PinoLogger
) {}

getPrimaryChannel(conversation: ConversationEntity): ConversationChannel {
const channel = conversation.channels?.[0];
if (!channel) {
throw new BadRequestException(`Conversation ${conversation._id} has no channel`);
}

return channel;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async createOrGetConversation(params: CreateOrGetConversationParams): Promise<ConversationEntity> {
const { environmentId, organizationId, platformThreadId } = params;

Expand Down Expand Up @@ -165,4 +202,137 @@ export class AgentConversationService {
): Promise<void> {
await this.conversationRepository.updateChannelThread(environmentId, organizationId, conversationId, platformThreadId, serializedThread);
}

async getConversation(
conversationId: string,
environmentId: string,
organizationId: string
): Promise<ConversationEntity | null> {
return this.conversationRepository.findOne(
{ _id: conversationId, _environmentId: environmentId, _organizationId: organizationId },
'*'
);
}

async findByPlatformThread(
environmentId: string,
organizationId: string,
platformThreadId: string
): Promise<ConversationEntity | null> {
return this.conversationRepository.findByPlatformThread(environmentId, organizationId, platformThreadId);
}

async setFirstPlatformMessageId(
environmentId: string,
organizationId: string,
conversationId: string,
platformThreadId: string,
messageId: string
): Promise<void> {
await this.conversationRepository.setFirstPlatformMessageId(
environmentId,
organizationId,
conversationId,
platformThreadId,
messageId
);
}

async persistAgentMessage(params: PersistAgentActivityParams): Promise<ConversationActivityEntity> {
return this.persistAgentActivity(params, ConversationActivityTypeEnum.MESSAGE, 'activity');
}

async persistAgentEdit(params: PersistAgentActivityParams): Promise<ConversationActivityEntity> {
return this.persistAgentActivity(params, ConversationActivityTypeEnum.EDIT, 'preview');
}

private async persistAgentActivity(
params: PersistAgentActivityParams,
type: ConversationActivityTypeEnum,
touch: 'activity' | 'preview'
): Promise<ConversationActivityEntity> {
const threadId = params.platformThreadId ?? params.channel.platformThreadId;

const touchFn =
touch === 'activity'
? this.conversationRepository.touchActivity.bind(this.conversationRepository)
: this.conversationRepository.touchPreview.bind(this.conversationRepository);

const [activity] = await Promise.all([
this.activityRepository.createAgentActivity({
identifier: `act_${shortId(12)}`,
conversationId: params.conversationId,
platform: params.channel.platform,
integrationId: params.channel._integrationId,
platformThreadId: threadId,
platformMessageId: params.platformMessageId,
agentId: params.agentIdentifier,
senderName: params.agentName,
content: params.content,
richContent: params.richContent,
type,
environmentId: params.environmentId,
organizationId: params.organizationId,
}),
touchFn(params.environmentId, params.organizationId, params.conversationId, params.content),
]);

return activity;
}

async updateMetadata(params: UpdateMetadataParams): Promise<void> {
const merged: Record<string, unknown> = { ...(params.currentMetadata ?? {}) };
for (const signal of params.signals) {
merged[signal.key] = signal.value;
}

const serialized = JSON.stringify(merged);
if (Buffer.byteLength(serialized) > 65_536) {
throw new BadRequestException('Conversation metadata exceeds 64KB limit');
}

await Promise.all([
this.conversationRepository.updateMetadata(
params.environmentId,
params.organizationId,
params.conversationId,
merged
),
Comment thread
ChmaraX marked this conversation as resolved.
this.activityRepository.createSignalActivity({
identifier: `act_${shortId(12)}`,
conversationId: params.conversationId,
platform: params.channel.platform,
integrationId: params.channel._integrationId,
platformThreadId: params.channel.platformThreadId,
agentId: params.agentIdentifier,
content: `Metadata updated: ${params.signals.map((s) => s.key).join(', ')}`,
signalData: { type: 'metadata', payload: merged },
environmentId: params.environmentId,
organizationId: params.organizationId,
}),
]);
}

async resolveConversation(params: ResolveConversationParams): Promise<void> {
await Promise.all([
this.conversationRepository.updateStatus(
params.environmentId,
params.organizationId,
params.conversationId,
ConversationStatusEnum.RESOLVED
),
this.activityRepository.createSignalActivity({
identifier: `act_${shortId(12)}`,
conversationId: params.conversationId,
platform: params.channel.platform,
integrationId: params.channel._integrationId,
platformThreadId: params.channel.platformThreadId,
agentId: params.agentIdentifier,
content: params.summary ?? 'Conversation resolved',
signalData: { type: 'resolve', payload: params.summary ? { summary: params.summary } : undefined },
environmentId: params.environmentId,
organizationId: params.organizationId,
}),
]);
}
}
40 changes: 17 additions & 23 deletions apps/api/src/app/agents/services/agent-inbound-handler.service.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { PinoLogger } from '@novu/application-generic';
import {
ConversationActivitySenderTypeEnum,
ConversationParticipantTypeEnum,
ConversationRepository,
SubscriberRepository,
} from '@novu/dal';
import type { EmojiValue, Message, Thread } from 'chat';
import { AgentEventEnum } from '../dtos/agent-event.enum';
import { PLATFORMS_WITH_TYPING_INDICATOR } from '../dtos/agent-platform.enum';
import { HandleAgentReplyCommand } from '../usecases/handle-agent-reply/handle-agent-reply.command';
import { HandleAgentReply } from '../usecases/handle-agent-reply/handle-agent-reply.usecase';
import { ResolvedAgentConfig } from './agent-config-resolver.service';
import { AgentConversationService } from './agent-conversation.service';
import { AgentSubscriberResolver } from './agent-subscriber-resolver.service';
Expand Down Expand Up @@ -38,11 +35,8 @@ export class AgentInboundHandler {
private readonly logger: PinoLogger,
private readonly subscriberResolver: AgentSubscriberResolver,
private readonly conversationService: AgentConversationService,
private readonly conversationRepository: ConversationRepository,
private readonly bridgeExecutor: BridgeExecutorService,
private readonly subscriberRepository: SubscriberRepository,
@Inject(forwardRef(() => HandleAgentReply))
private readonly handleAgentReply: HandleAgentReply
private readonly subscriberRepository: SubscriberRepository
) {}

async handle(
Expand Down Expand Up @@ -115,8 +109,8 @@ export class AgentInboundHandler {
organizationId: config.organizationId,
});

const channel = conversation.channels?.[0];
const isFirstMessage = !channel?.firstPlatformMessageId;
const primaryChannel = this.conversationService.getPrimaryChannel(conversation);
const isFirstMessage = !primaryChannel.firstPlatformMessageId;

if (config.acknowledgeOnReceived) {
const supportsTyping = PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform);
Expand All @@ -131,7 +125,7 @@ export class AgentInboundHandler {
this.logger.warn(err, `[agent:${agentId}] Failed to add ack reaction to first message`);
});

this.conversationRepository
this.conversationService
.setFirstPlatformMessageId(
config.environmentId,
config.organizationId,
Expand Down Expand Up @@ -177,17 +171,17 @@ export class AgentInboundHandler {
});
} catch (err) {
if (err instanceof NoBridgeUrlError) {
await this.handleAgentReply.execute(
HandleAgentReplyCommand.create({
userId: 'system',
environmentId: config.environmentId,
organizationId: config.organizationId,
conversationId: conversation._id,
agentIdentifier: config.agentIdentifier,
integrationIdentifier: config.integrationIdentifier,
reply: { text: ONBOARDING_NO_BRIDGE_REPLY_MARKDOWN },
})
);
const sent = await thread.post(ONBOARDING_NO_BRIDGE_REPLY_MARKDOWN);
const channel = this.conversationService.getPrimaryChannel(conversation);
await this.conversationService.persistAgentMessage({
conversationId: conversation._id,
channel,
platformMessageId: (sent as { id?: string })?.id ?? '',
agentIdentifier: config.agentIdentifier,
content: ONBOARDING_NO_BRIDGE_REPLY_MARKDOWN,
environmentId: config.environmentId,
organizationId: config.organizationId,
});

return;
}
Expand All @@ -204,7 +198,7 @@ export class AgentInboundHandler {
return;
}

const conversation = await this.conversationRepository.findByPlatformThread(
const conversation = await this.conversationService.findByPlatformThread(
config.environmentId,
config.organizationId,
threadId
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/app/agents/services/chat-sdk.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BadRequestException, forwardRef, Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
import { BadRequestException, Injectable, OnModuleDestroy } from '@nestjs/common';
import { CacheService, PinoLogger } from '@novu/application-generic';
import type { SentMessageInfo } from '@novu/framework';
import type { AdapterPostableMessage, Chat, EmojiValue, Message, ReactionEvent, Thread } from 'chat';
Expand Down Expand Up @@ -58,7 +58,6 @@ export class ChatSdkService implements OnModuleDestroy {
private readonly logger: PinoLogger,
private readonly cacheService: CacheService,
private readonly agentConfigResolver: AgentConfigResolver,
@Inject(forwardRef(() => AgentInboundHandler))
private readonly inboundHandler: AgentInboundHandler
) {
this.instances = new LRUCache<string, CachedChat>({
Expand Down
Loading
Loading