diff --git a/apps/api/src/app/agents/agents-webhook.controller.ts b/apps/api/src/app/agents/agents-webhook.controller.ts index 724a225f861..fec394854c3 100644 --- a/apps/api/src/app/agents/agents-webhook.controller.ts +++ b/apps/api/src/app/agents/agents-webhook.controller.ts @@ -50,7 +50,7 @@ export class AgentsWebhookController { agentIdentifier: agentId, integrationIdentifier: body.integrationIdentifier, reply: body.reply, - update: body.update, + edit: body.edit, resolve: body.resolve, signals: body.signals as Signal[], }) diff --git a/apps/api/src/app/agents/dtos/agent-reply-payload.dto.ts b/apps/api/src/app/agents/dtos/agent-reply-payload.dto.ts index a2b8e6bd499..52199223a2a 100644 --- a/apps/api/src/app/agents/dtos/agent-reply-payload.dto.ts +++ b/apps/api/src/app/agents/dtos/agent-reply-payload.dto.ts @@ -87,6 +87,20 @@ export class ReplyContentDto { files?: FileRef[]; } +export class EditPayloadDto { + @ApiProperty() + @IsString() + @IsNotEmpty() + messageId: string; + + @ApiProperty({ type: ReplyContentDto }) + @IsObject() + @ValidateNested() + @Validate(IsValidReplyContent) + @Type(() => ReplyContentDto) + content: ReplyContentDto; +} + export class ResolveDto { @ApiPropertyOptional() @IsOptional() @@ -144,13 +158,12 @@ export class AgentReplyPayloadDto { @Type(() => ReplyContentDto) reply?: ReplyContentDto; - @ApiPropertyOptional({ type: ReplyContentDto }) + @ApiPropertyOptional({ type: EditPayloadDto }) @IsOptional() @IsObject() @ValidateNested() - @Validate(IsValidReplyContent) - @Type(() => ReplyContentDto) - update?: ReplyContentDto; + @Type(() => EditPayloadDto) + edit?: EditPayloadDto; @ApiPropertyOptional({ type: ResolveDto }) @IsOptional() diff --git a/apps/api/src/app/agents/e2e/agent-reply.e2e.ts b/apps/api/src/app/agents/e2e/agent-reply.e2e.ts index 7f67eaecf80..69b13b036c8 100644 --- a/apps/api/src/app/agents/e2e/agent-reply.e2e.ts +++ b/apps/api/src/app/agents/e2e/agent-reply.e2e.ts @@ -34,7 +34,12 @@ describe('Agent Reply - /agents/:agentId/reply #novu-v2', () => { }); const chatSdkService = testServer.getService(ChatSdkService); - sinon.stub(chatSdkService, 'postToConversation').resolves(); + sinon + .stub(chatSdkService, 'postToConversation') + .resolves({ messageId: 'platform-msg-1', platformThreadId: 'platform-thread-1' }); + sinon + .stub(chatSdkService, 'editInConversation') + .resolves({ messageId: 'platform-msg-1', platformThreadId: 'platform-thread-1' }); sinon.stub(chatSdkService, 'reactToMessage').resolves(); sinon.stub(chatSdkService, 'removeReaction').resolves(); }); @@ -61,7 +66,7 @@ describe('Agent Reply - /agents/:agentId/reply #novu-v2', () => { }); expect(res.status).to.equal(200); - expect(res.body.data.status).to.equal('ok'); + expect(res.body.data?.messageId).to.equal('platform-msg-1'); const convAfter = await conversationRepository.findOne( { _id: conversationId, _environmentId: ctx.session.environment._id }, @@ -81,42 +86,83 @@ describe('Agent Reply - /agents/:agentId/reply #novu-v2', () => { expect(agentActivity!.content).to.equal('Hello from agent'); }); - it('should persist update activity and return early without executing resolve', async () => { + it('should return messageId/platformThreadId on successful reply', async () => { const conversationId = await seedConversation(ctx); const res = await postReply({ conversationId, integrationIdentifier: ctx.integrationIdentifier, - update: { text: 'Processing...' }, - resolve: { summary: 'Should be ignored' }, + reply: { text: 'Hello' }, }); expect(res.status).to.equal(200); - expect(res.body.data.status).to.equal('update_sent'); + expect(res.body.data.messageId).to.equal('platform-msg-1'); + expect(res.body.data.platformThreadId).to.equal('platform-thread-1'); + }); + + it('should edit a previously sent message and persist an edit activity', async () => { + const conversationId = await seedConversation(ctx); + + const convBefore = await conversationRepository.findOne( + { _id: conversationId, _environmentId: ctx.session.environment._id }, + '*' + ); + const countBefore = convBefore!.messageCount; + + const res = await postReply({ + conversationId, + integrationIdentifier: ctx.integrationIdentifier, + edit: { + messageId: 'platform-msg-1', + content: { text: 'Edited content' }, + }, + }); + + expect(res.status).to.equal(200); + expect(res.body.data.messageId).to.equal('platform-msg-1'); + expect(res.body.data.platformThreadId).to.equal('platform-thread-1'); const activities = await activityRepository.findByConversation( ctx.session.environment._id, conversationId ); - const updateActivity = activities.find((a) => a.type === ConversationActivityTypeEnum.UPDATE); - expect(updateActivity).to.exist; - expect(updateActivity!.content).to.equal('Processing...'); + const editActivity = activities.find((a) => a.type === ConversationActivityTypeEnum.EDIT); + expect(editActivity).to.exist; + expect(editActivity!.content).to.equal('Edited content'); + expect(editActivity!.platformMessageId).to.equal('platform-msg-1'); const conversation = await conversationRepository.findOne( { _id: conversationId, _environmentId: ctx.session.environment._id }, '*' ); expect(conversation!.status).to.equal(ConversationStatusEnum.ACTIVE); + // Edit refreshes the conversation's lastMessagePreview to the new content... + expect(conversation!.lastMessagePreview).to.equal('Edited content'); + // ...without bumping messageCount (edits mutate an existing message, not add one). + expect(conversation!.messageCount).to.equal(countBefore); }); - it('should reject when both reply and update are provided', async () => { + it('should reject when both reply and edit are provided', async () => { const conversationId = await seedConversation(ctx); const res = await postReply({ conversationId, integrationIdentifier: ctx.integrationIdentifier, reply: { text: 'a' }, - update: { text: 'b' }, + edit: { messageId: 'platform-msg-1', content: { text: 'b' } }, + }); + + expect(res.status).to.equal(400); + }); + + it('should reject when edit is combined with signals', async () => { + const conversationId = await seedConversation(ctx); + + const res = await postReply({ + conversationId, + integrationIdentifier: ctx.integrationIdentifier, + edit: { messageId: 'platform-msg-1', content: { text: 'b' } }, + signals: [{ type: 'metadata', key: 'k', value: 'v' }], }); expect(res.status).to.equal(400); @@ -146,7 +192,7 @@ describe('Agent Reply - /agents/:agentId/reply #novu-v2', () => { }); expect(res.status).to.equal(200); - expect(res.body.data.status).to.equal('ok'); + expect(res.body.data).to.be.null; const conversation = await conversationRepository.findOne( { _id: conversationId, _environmentId: ctx.session.environment._id }, @@ -195,7 +241,7 @@ describe('Agent Reply - /agents/:agentId/reply #novu-v2', () => { }); expect(res.status).to.equal(200); - expect(res.body.data.status).to.equal('ok'); + expect(res.body.data).to.be.null; const conversation = await conversationRepository.findOne( { _id: conversationId, _environmentId: ctx.session.environment._id }, @@ -237,7 +283,8 @@ describe('Agent Reply - /agents/:agentId/reply #novu-v2', () => { }); expect(res.status).to.equal(200); - expect(res.body.data.status).to.equal('ok'); + expect(res.body.data.messageId).to.equal('platform-msg-1'); + expect(res.body.data.platformThreadId).to.equal('platform-thread-1'); const convAfter = await conversationRepository.findOne( { _id: conversationId, _environmentId: ctx.session.environment._id }, diff --git a/apps/api/src/app/agents/services/chat-sdk.service.ts b/apps/api/src/app/agents/services/chat-sdk.service.ts index 6c1ab7020e7..14e640ff766 100644 --- a/apps/api/src/app/agents/services/chat-sdk.service.ts +++ b/apps/api/src/app/agents/services/chat-sdk.service.ts @@ -1,6 +1,7 @@ import { BadRequestException, forwardRef, Inject, Injectable, OnModuleDestroy } from '@nestjs/common'; import { PinoLogger } from '@novu/application-generic'; -import type { Chat, EmojiValue, Message, ReactionEvent, Thread } from 'chat'; +import type { SentMessageInfo } from '@novu/framework'; +import type { AdapterPostableMessage, Chat, EmojiValue, Message, ReactionEvent, Thread } from 'chat'; import { Request as ExpressRequest, Response as ExpressResponse } from 'express'; import { LRUCache } from 'lru-cache'; import { AgentEventEnum } from '../dtos/agent-event.enum'; @@ -105,7 +106,7 @@ export class ChatSdkService implements OnModuleDestroy { platform: string, serializedThread: Record, content: ReplyContentDto - ): Promise { + ): Promise { const config = await this.agentConfigResolver.resolve(agentId, integrationIdentifier); const instanceKey = `${agentId}:${integrationIdentifier}`; const chat = await this.getOrCreate(instanceKey, agentId, config.platform, config); @@ -114,13 +115,52 @@ export class ChatSdkService implements OnModuleDestroy { const adapter = chat.getAdapter(platform); const thread = ThreadImpl.fromJSON(serializedThread, adapter); + let sent: { id: string; threadId: string }; + if (content.card) { + sent = await thread.post(content.card); + } else if (content.markdown !== undefined) { + sent = await thread.post({ markdown: content.markdown, files: content.files }); + } else { + sent = await thread.post(content.text ?? ''); + } + + return { messageId: sent.id, platformThreadId: sent.threadId }; + } + + async editInConversation( + agentId: string, + integrationIdentifier: string, + platform: string, + platformThreadId: string, + platformMessageId: string, + content: ReplyContentDto + ): Promise { + const config = await this.agentConfigResolver.resolve(agentId, integrationIdentifier); + const instanceKey = `${agentId}:${integrationIdentifier}`; + const chat = await this.getOrCreate(instanceKey, agentId, config.platform, config); + + const adapter = chat.getAdapter(platform); + if (typeof adapter.editMessage !== 'function') { + throw new BadRequestException(`Platform ${platform} does not support editing messages`); + } + + let edited: { id: string; threadId: string }; if (content.card) { - await thread.post(content.card); + edited = await adapter.editMessage( + platformThreadId, + platformMessageId, + content.card as unknown as AdapterPostableMessage + ); } else if (content.markdown !== undefined) { - await thread.post({ markdown: content.markdown, files: content.files }); + edited = await adapter.editMessage(platformThreadId, platformMessageId, { + markdown: content.markdown, + files: content.files, + } as unknown as AdapterPostableMessage); } else { - await thread.post(content.text ?? ''); + edited = await adapter.editMessage(platformThreadId, platformMessageId, content.text ?? ''); } + + return { messageId: edited.id, platformThreadId: edited.threadId }; } async removeReaction( diff --git a/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.command.ts b/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.command.ts index b47b65962ed..9e03d666a3d 100644 --- a/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.command.ts +++ b/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.command.ts @@ -2,7 +2,7 @@ import type { Signal } from '@novu/framework'; import { Type } from 'class-transformer'; import { IsArray, IsNotEmpty, IsObject, IsOptional, IsString, ValidateNested } from 'class-validator'; import { EnvironmentWithUserCommand } from '../../../shared/commands/project.command'; -import { ReplyContentDto } from '../../dtos/agent-reply-payload.dto'; +import { EditPayloadDto, ReplyContentDto } from '../../dtos/agent-reply-payload.dto'; export type { Signal } from '@novu/framework'; @@ -26,8 +26,8 @@ export class HandleAgentReplyCommand extends EnvironmentWithUserCommand { @IsOptional() @ValidateNested() - @Type(() => ReplyContentDto) - update?: ReplyContentDto; + @Type(() => EditPayloadDto) + edit?: EditPayloadDto; @IsOptional() @IsObject() diff --git a/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts b/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts index 1e873751f2a..1cb610a5751 100644 --- a/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts +++ b/apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts @@ -17,8 +17,9 @@ import { ConversationStatusEnum, SubscriberRepository, } from '@novu/dal'; +import type { SentMessageInfo } from '@novu/framework'; import { AgentEventEnum } from '../../dtos/agent-event.enum'; -import type { ReplyContentDto } from '../../dtos/agent-reply-payload.dto'; +import type { EditPayloadDto, ReplyContentDto } from '../../dtos/agent-reply-payload.dto'; import { AgentConfigResolver, ResolvedAgentConfig } from '../../services/agent-config-resolver.service'; import { AgentConversationService } from '../../services/agent-conversation.service'; import { BridgeExecutorService } from '../../services/bridge-executor.service'; @@ -40,12 +41,15 @@ export class HandleAgentReply { private readonly logger: PinoLogger ) {} - async execute(command: HandleAgentReplyCommand): Promise<{ status: string }> { - if (command.reply && command.update) { - throw new BadRequestException('Only one of reply or update can be provided'); + async execute(command: HandleAgentReplyCommand): Promise { + if (command.reply && command.edit) { + throw new BadRequestException('Only one of reply or edit can be provided'); } - if (!command.reply && !command.update && !command.resolve && !command.signals?.length) { - throw new BadRequestException('At least one of reply, update, resolve, or signals must be provided'); + if (command.edit && (command.resolve || command.signals?.length)) { + throw new BadRequestException('edit cannot be combined with resolve or signals'); + } + if (!command.reply && !command.edit && !command.resolve && !command.signals?.length) { + throw new BadRequestException('At least one of reply, edit, resolve, or signals must be provided'); } const conversation = await this.conversationRepository.findOne( @@ -62,19 +66,10 @@ export class HandleAgentReply { const channel = this.getPrimaryChannel(conversation); - if (command.update) { + if (command.edit) { const agentName = await this.resolveValidatedAgentNameForDelivery(command, conversation); - await this.deliverMessage( - command, - conversation, - channel, - command.update, - ConversationActivityTypeEnum.UPDATE, - agentName - ); - - return { status: 'update_sent' }; + return this.deliverEdit(command, conversation, channel, command.edit, agentName); } const needsConfig = !!(command.reply || command.resolve); @@ -82,10 +77,11 @@ export class HandleAgentReply { ? await this.agentConfigResolver.resolve(conversation._agentId, command.integrationIdentifier) : null; + let replyInfo: SentMessageInfo | undefined; if (command.reply) { const agentName = await this.resolveValidatedAgentNameForDelivery(command, conversation); - await this.deliverMessage( + replyInfo = await this.deliverMessage( command, conversation, channel, @@ -107,7 +103,7 @@ export class HandleAgentReply { await this.resolveConversation(command, config!, conversation, channel, command.resolve); } - return { status: 'ok' }; + return replyInfo ?? null; } private async resolveValidatedAgentNameForDelivery( @@ -150,23 +146,25 @@ export class HandleAgentReply { content: ReplyContentDto, type: ConversationActivityTypeEnum, agentName?: string - ): Promise { + ): Promise { const textFallback = this.extractTextFallback(content); + const sent = await this.chatSdkService.postToConversation( + conversation._agentId, + command.integrationIdentifier, + channel.platform, + channel.serializedThread!, + content + ); + await Promise.all([ - this.chatSdkService.postToConversation( - conversation._agentId, - command.integrationIdentifier, - channel.platform, - channel.serializedThread!, - content - ), this.activityRepository.createAgentActivity({ identifier: `act_${shortId(12)}`, conversationId: conversation._id, platform: channel.platform, integrationId: channel._integrationId, - platformThreadId: channel.platformThreadId, + platformThreadId: sent.platformThreadId || channel.platformThreadId, + platformMessageId: sent.messageId, agentId: command.agentIdentifier, senderName: agentName, content: textFallback, @@ -182,6 +180,56 @@ export class HandleAgentReply { textFallback ), ]); + + return sent; + } + + private async deliverEdit( + command: HandleAgentReplyCommand, + conversation: ConversationEntity, + channel: ConversationChannel, + edit: EditPayloadDto, + agentName?: string + ): Promise { + const textFallback = this.extractTextFallback(edit.content); + + const sent = await this.chatSdkService.editInConversation( + conversation._agentId, + command.integrationIdentifier, + channel.platform, + channel.platformThreadId, + edit.messageId, + edit.content + ); + + await Promise.all([ + this.activityRepository.createAgentActivity({ + identifier: `act_${shortId(12)}`, + conversationId: conversation._id, + platform: channel.platform, + integrationId: channel._integrationId, + platformThreadId: sent.platformThreadId || channel.platformThreadId, + platformMessageId: sent.messageId, + agentId: command.agentIdentifier, + senderName: agentName, + content: textFallback, + richContent: + edit.content.card || edit.content.files?.length ? (edit.content as Record) : undefined, + type: ConversationActivityTypeEnum.EDIT, + environmentId: command.environmentId, + organizationId: command.organizationId, + }), + // Refresh the conversation's lastMessagePreview + lastActivityAt without + // incrementing messageCount — edits mutate an existing message, they don't add one. + this.conversationRepository.touchPreview( + command.environmentId, + command.organizationId, + conversation._id, + textFallback + ), + ]); + + return sent; } private extractTextFallback(content: ReplyContentDto): string { diff --git a/libs/dal/src/repositories/conversation-activity/conversation-activity.entity.ts b/libs/dal/src/repositories/conversation-activity/conversation-activity.entity.ts index 0a5fc74c3f1..2d4b6123a41 100644 --- a/libs/dal/src/repositories/conversation-activity/conversation-activity.entity.ts +++ b/libs/dal/src/repositories/conversation-activity/conversation-activity.entity.ts @@ -4,8 +4,8 @@ import { OrganizationId } from '../organization'; export enum ConversationActivityTypeEnum { MESSAGE = 'message', - /** Interim status update sent via ctx.update() */ - UPDATE = 'update', + /** In-place edit of a previously sent agent message, via replyHandle.edit() */ + EDIT = 'edit', /** System-generated timeline event (e.g. workflow triggered, conversation resolved) */ SIGNAL = 'signal', } diff --git a/libs/dal/src/repositories/conversation-activity/conversation-activity.repository.ts b/libs/dal/src/repositories/conversation-activity/conversation-activity.repository.ts index 6c51f8bb4f7..778b2420958 100644 --- a/libs/dal/src/repositories/conversation-activity/conversation-activity.repository.ts +++ b/libs/dal/src/repositories/conversation-activity/conversation-activity.repository.ts @@ -89,6 +89,7 @@ export class ConversationActivityRepository extends BaseRepositoryV2< richContent?: Record; type?: ConversationActivityTypeEnum; senderName?: string; + platformMessageId?: string; environmentId: string; organizationId: string; }): Promise { @@ -104,6 +105,7 @@ export class ConversationActivityRepository extends BaseRepositoryV2< content: params.content, richContent: params.richContent, senderName: params.senderName, + platformMessageId: params.platformMessageId, _environmentId: params.environmentId, _organizationId: params.organizationId, }); diff --git a/libs/dal/src/repositories/conversation/conversation.repository.ts b/libs/dal/src/repositories/conversation/conversation.repository.ts index b4c5b72cc7c..655ff24d16a 100644 --- a/libs/dal/src/repositories/conversation/conversation.repository.ts +++ b/libs/dal/src/repositories/conversation/conversation.repository.ts @@ -124,6 +124,28 @@ export class ConversationRepository extends BaseRepositoryV2< ); } + /** + * Refresh `lastActivityAt` and `lastMessagePreview` without incrementing `messageCount`. + * Used for in-place message edits (replyHandle.edit) — the message count stays the same, + * but the conversation's timeline and preview should reflect the latest content. + */ + async touchPreview( + environmentId: string, + organizationId: string, + id: string, + messagePreview: string + ): Promise { + await this.update( + { _id: id, _environmentId: environmentId, _organizationId: organizationId }, + { + $set: { + lastActivityAt: new Date().toISOString(), + lastMessagePreview: messagePreview.slice(0, 200), + }, + } + ); + } + async updateChannelThread( environmentId: string, organizationId: string, diff --git a/packages/framework/src/index.ts b/packages/framework/src/index.ts index fe8a6ff1daa..98e619559b0 100644 --- a/packages/framework/src/index.ts +++ b/packages/framework/src/index.ts @@ -18,10 +18,13 @@ export type { AgentSubscriber, CardChild, CardElement, + EditPayload, FileRef, MessageContent, MetadataSignal, ReplyContent, + ReplyHandle, + SentMessageInfo, Signal, TriggerSignal, } from './resources'; diff --git a/packages/framework/src/resources/agent/agent.context.ts b/packages/framework/src/resources/agent/agent.context.ts index e4764ddfd65..934c17ad1b8 100644 --- a/packages/framework/src/resources/agent/agent.context.ts +++ b/packages/framework/src/resources/agent/agent.context.ts @@ -12,6 +12,8 @@ import type { AgentSubscriber, MessageContent, ReplyContent, + ReplyHandle, + SentMessageInfo, Signal, } from './agent.types'; @@ -47,6 +49,49 @@ function serializeContent(content: MessageContent): ReplyContent { throw new Error('Invalid message content — expected string, { markdown }, or CardElement'); } +interface ReplyPoster { + post(body: AgentReplyPayload): Promise; +} + +class ReplyHandleImpl implements ReplyHandle { + public messageId: string; + public platformThreadId: string; + + constructor( + messageId: string, + platformThreadId: string, + private readonly conversationId: string, + private readonly integrationIdentifier: string, + private readonly poster: ReplyPoster + ) { + this.messageId = messageId; + this.platformThreadId = platformThreadId; + } + + async edit(content: MessageContent): Promise { + const info = await this.poster.post({ + conversationId: this.conversationId, + integrationIdentifier: this.integrationIdentifier, + edit: { + messageId: this.messageId, + content: serializeContent(content), + }, + }); + + if (!info) { + throw new Error('Agent edit did not return a message handle'); + } + + // Mutate-in-place: the handle represents the same platform message, so we refresh + // ids from the edit response (Slack/Teams preserve them; other platforms may not) + // and return `this` to honour the "same handle for chaining" contract. + this.messageId = info.messageId; + this.platformThreadId = info.platformThreadId; + + return this; + } +} + export class AgentContextImpl implements AgentContext { readonly event: string; readonly action: AgentAction | null; @@ -66,6 +111,7 @@ export class AgentContextImpl implements AgentContext { private readonly _conversationId: string; private readonly _integrationIdentifier: string; private readonly _secretKey: string; + private readonly _poster: ReplyPoster; constructor(request: AgentBridgeRequest, secretKey: string) { this.event = request.event; @@ -82,6 +128,7 @@ export class AgentContextImpl implements AgentContext { this._conversationId = request.conversationId; this._integrationIdentifier = request.integrationIdentifier; this._secretKey = secretKey; + this._poster = { post: (body) => this._post(body) }; this.metadata = { set: (key: string, value: unknown) => { @@ -90,7 +137,7 @@ export class AgentContextImpl implements AgentContext { }; } - async reply(content: MessageContent): Promise { + async reply(content: MessageContent): Promise { const body: AgentReplyPayload = { conversationId: this._conversationId, integrationIdentifier: this._integrationIdentifier, @@ -107,17 +154,18 @@ export class AgentContextImpl implements AgentContext { this._resolveSignal = null; } - await this._post(body); - } - - async update(content: MessageContent): Promise { - const body: AgentReplyPayload = { - conversationId: this._conversationId, - integrationIdentifier: this._integrationIdentifier, - update: serializeContent(content), - }; + const info = await this._post(body); + if (!info) { + throw new Error('Agent reply did not return a message handle'); + } - await this._post(body); + return new ReplyHandleImpl( + info.messageId, + info.platformThreadId, + this._conversationId, + this._integrationIdentifier, + this._poster + ); } resolve(summary?: string): void { @@ -155,7 +203,7 @@ export class AgentContextImpl implements AgentContext { await this._post(body); } - private async _post(body: AgentReplyPayload): Promise { + private async _post(body: AgentReplyPayload): Promise { const response = await fetch(this._replyUrl, { method: 'POST', headers: { @@ -169,5 +217,25 @@ export class AgentContextImpl implements AgentContext { const text = await response.text().catch(() => ''); throw new Error(`Agent reply failed (${response.status}): ${text}`); } + + const raw = await response.text().catch(() => ''); + if (!raw) { + return null; + } + + try { + const parsed = JSON.parse(raw) as { data?: Record } | Record; + const envelope = (parsed && typeof parsed === 'object' && 'data' in parsed ? parsed.data : parsed) as + | Record + | undefined; + + if (envelope && typeof envelope.messageId === 'string' && typeof envelope.platformThreadId === 'string') { + return { messageId: envelope.messageId, platformThreadId: envelope.platformThreadId }; + } + } catch { + // flush-only responses return null or an empty body; tolerate and fall through. + } + + return null; } } diff --git a/packages/framework/src/resources/agent/agent.test.ts b/packages/framework/src/resources/agent/agent.test.ts index 589b7cb5b3b..f8b236ed607 100644 --- a/packages/framework/src/resources/agent/agent.test.ts +++ b/packages/framework/src/resources/agent/agent.test.ts @@ -98,10 +98,18 @@ describe('agent dispatch via NovuRequestHandler', () => { beforeEach(() => { client = new Client({ secretKey: 'test-secret-key', strictAuthentication: false }); - fetchMock = vi.fn().mockResolvedValue({ - ok: true, - text: () => Promise.resolve('{}'), - json: () => Promise.resolve({ status: 'ok' }), + let counter = 0; + fetchMock = vi.fn().mockImplementation(() => { + counter += 1; + const body = { + data: { status: 'ok', messageId: `msg-${counter}`, platformThreadId: 'thread-1' }, + }; + + return Promise.resolve({ + ok: true, + text: () => Promise.resolve(JSON.stringify(body)), + json: () => Promise.resolve(body), + }); }); global.fetch = fetchMock as typeof fetch; }); @@ -225,12 +233,11 @@ describe('agent dispatch via NovuRequestHandler', () => { expect(replyBody.signals[1]).toEqual({ type: 'metadata', key: 'language', value: 'en' }); }); - it('should send update independently without signals', async () => { + it('should edit a previously sent reply via the returned handle', async () => { const testBot = agent('test-bot', { onMessage: async (ctx) => { - ctx.metadata.set('step', 'thinking'); - await ctx.update('Thinking...'); - await ctx.reply('Done thinking'); + const msg = await ctx.reply('Thinking...'); + await msg.edit('Done thinking'); }, }); @@ -261,16 +268,59 @@ describe('agent dispatch via NovuRequestHandler', () => { ); const parsedBodies = replyCalls.map(([, init]: any[]) => JSON.parse(init.body)); - const updateBody = parsedBodies.find((body: any) => body.update); - const replyBody = parsedBodies.find((body: any) => body.reply); + const initialReply = parsedBodies.find((body: any) => body.reply); + const editBody = parsedBodies.find((body: any) => body.edit); - expect(updateBody).toBeDefined(); - expect(updateBody.update.text).toBe('Thinking...'); - expect(updateBody.signals).toBeUndefined(); + expect(initialReply).toBeDefined(); + expect(initialReply.reply.text).toBe('Thinking...'); - expect(replyBody).toBeDefined(); - expect(replyBody.reply.text).toBe('Done thinking'); - expect(replyBody.signals).toHaveLength(1); + expect(editBody).toBeDefined(); + expect(editBody.edit.content.text).toBe('Done thinking'); + expect(editBody.edit.messageId).toBe('msg-1'); + expect(editBody.reply).toBeUndefined(); + expect(editBody.signals).toBeUndefined(); + }); + + it('should not attach signals or resolve to an edit call', async () => { + const testBot = agent('test-bot', { + onMessage: async (ctx) => { + ctx.metadata.set('step', 'thinking'); + const msg = await ctx.reply('Thinking...'); + await msg.edit('Done'); + }, + }); + + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [testBot], + client, + handler: () => { + const body = createMockBridgeRequest(); + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=test-bot&event=onMessage`); + + return { + body: () => body, + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + await handler.createHandler()(); + await vi.waitFor(() => expect(fetchMock.mock.calls.length).toBeGreaterThanOrEqual(2)); + + const bodies = fetchMock.mock.calls + .filter((call: any[]) => call[0] === 'https://api.novu.co/v1/agents/test-bot/reply') + .map(([, init]: any[]) => JSON.parse(init.body)); + + const firstReply = bodies.find((b: any) => b.reply); + const edit = bodies.find((b: any) => b.edit); + + expect(firstReply.signals).toHaveLength(1); + expect(edit.signals).toBeUndefined(); + expect(edit.resolve).toBeUndefined(); }); it('should flush remaining signals after onResolve', async () => { @@ -531,11 +581,11 @@ describe('agent dispatch via NovuRequestHandler', () => { expect(replyBody.reply.markdown).toBeUndefined(); }); - it('should serialize CardElement on update', async () => { + it('should serialize CardElement on edit', async () => { const testBot = agent('test-bot', { onMessage: async (ctx) => { - await ctx.update(Card({ title: 'Loading...', children: [] })); - await ctx.reply('Done'); + const msg = await ctx.reply('Loading...'); + await msg.edit(Card({ title: 'Loaded', children: [] })); }, }); @@ -565,13 +615,14 @@ describe('agent dispatch via NovuRequestHandler', () => { ); const parsedBodies = replyCalls.map(([, init]: any[]) => JSON.parse(init.body)); - const updateBody = parsedBodies.find((body: any) => body.update); - expect(updateBody.update.card).toBeDefined(); - expect(updateBody.update.card.type).toBe('card'); - expect(updateBody.update.card.title).toBe('Loading...'); + const editBody = parsedBodies.find((body: any) => body.edit); + expect(editBody.edit.content.card).toBeDefined(); + expect(editBody.edit.content.card.type).toBe('card'); + expect(editBody.edit.content.card.title).toBe('Loaded'); + expect(editBody.edit.messageId).toBe('msg-1'); - const replyBody = parsedBodies.find((body: any) => body.reply); - expect(replyBody.reply.text).toBe('Done'); + const initialReply = parsedBodies.find((body: any) => body.reply); + expect(initialReply.reply.text).toBe('Loading...'); }); it('should batch signals with card reply', async () => { diff --git a/packages/framework/src/resources/agent/agent.types.ts b/packages/framework/src/resources/agent/agent.types.ts index d746e10fbc9..db4d8fa8b0a 100644 --- a/packages/framework/src/resources/agent/agent.types.ts +++ b/packages/framework/src/resources/agent/agent.types.ts @@ -84,7 +84,7 @@ export interface FileRef { } /** - * Content accepted by ctx.reply() and ctx.update(). + * Content accepted by ctx.reply() and handle.edit(). * * - `string` — plain text * - `{ markdown, files? }` — markdown-formatted text, optionally with file attachments @@ -117,6 +117,20 @@ export interface AgentReaction { message: AgentMessage | null; } +/** + * Handle to a message posted via ctx.reply(). Mirrors the chat SDK's `SentMessage` + * primitive: edits apply in-place on the platform (same platform message, content changes) + * and never post a new message. + */ +export interface ReplyHandle { + /** Platform-native message id (e.g. Slack ts, Teams activityId). */ + readonly messageId: string; + /** Platform-native thread id this message lives in. */ + readonly platformThreadId: string; + /** Edit this message in place with new content. Returns the same handle for chaining. */ + edit(content: MessageContent): Promise; +} + export interface AgentContext { readonly event: string; readonly action: AgentAction | null; @@ -128,8 +142,16 @@ export interface AgentContext { readonly platform: string; readonly platformContext: AgentPlatformContext; - reply(content: MessageContent): Promise; - update(content: MessageContent): Promise; + /** + * Post a message to the conversation and return a handle to it. + * Use the handle to edit the message in place later — no second post. + * + * @example + * const msg = await ctx.reply('Thinking…'); + * // ... do work ... + * await msg.edit('Here is the answer'); + */ + reply(content: MessageContent): Promise; resolve(summary?: string): void; metadata: { set(key: string, value: unknown): void; @@ -176,11 +198,23 @@ export type MetadataSignal = { type: 'metadata'; key: string; value: unknown }; export type TriggerSignal = { type: 'trigger'; workflowId: string; to?: string; payload?: Record }; export type Signal = MetadataSignal | TriggerSignal; +/** In-place edit of a previously posted agent message. Identified by platform message id. */ +export interface EditPayload { + messageId: string; + content: ReplyContent; +} + export interface AgentReplyPayload { conversationId: string; integrationIdentifier: string; reply?: ReplyContent; - update?: ReplyContent; + edit?: EditPayload; resolve?: { summary?: string }; signals?: Signal[]; } + +/** Shape returned by /agents/:id/reply when a reply or edit was delivered. */ +export interface SentMessageInfo { + messageId: string; + platformThreadId: string; +} diff --git a/packages/framework/src/resources/agent/index.ts b/packages/framework/src/resources/agent/index.ts index 30eac7a88a9..56d40614203 100644 --- a/packages/framework/src/resources/agent/index.ts +++ b/packages/framework/src/resources/agent/index.ts @@ -28,11 +28,14 @@ export type { AgentReaction, AgentReplyPayload, AgentSubscriber, + EditPayload, FileRef, MessageContent, MetadataSignal, ReplyContent, + ReplyHandle, Signal, + SentMessageInfo, TriggerSignal, } from './agent.types'; export { AgentEventEnum } from './agent.types';