Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/src/app/agents/dtos/agent-event.enum.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export enum AgentEventEnum {
ON_MESSAGE = 'onMessage',
ON_ACTION = 'onAction',
ON_RESOLVE = 'onResolve',
}
70 changes: 60 additions & 10 deletions apps/api/src/app/agents/dtos/agent-reply-payload.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,72 @@ import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { Type } from 'class-transformer';
import {
IsArray,
IsDefined,
IsIn,
IsNotEmpty,
IsObject,
IsOptional,
IsString,
MaxLength,
Validate,
ValidateNested,
ValidatorConstraint,
ValidatorConstraintInterface,
} from 'class-validator';

const SIGNAL_TYPES = ['metadata', 'trigger'] as const;

export class TextContentDto {
@ApiProperty()
export interface FileRef {
filename: string;
mimeType?: string;
data?: string;
url?: string;
}

@ValidatorConstraint({ name: 'isValidReplyContent', async: false })
export class IsValidReplyContent implements ValidatorConstraintInterface {
validate(content: ReplyContentDto): boolean {
if (!content) return true;

const fields = [content.text, content.markdown, content.card].filter((v) => v !== undefined);
if (fields.length !== 1) return false;

if (content.files?.length && !content.markdown) return false;

for (const file of content.files ?? []) {
const sources = [file.data, file.url].filter(Boolean);
if (sources.length !== 1) return false;
}

return true;
}

defaultMessage(): string {
return 'Content must have exactly one of text, markdown, or card. Files only allowed with markdown. Each file needs exactly one of data or url.';
}
}

export class ReplyContentDto {
@ApiPropertyOptional()
@IsOptional()
@IsString()
@IsNotEmpty()
@MaxLength(40_000)
text: string;
text?: string;

@ApiPropertyOptional()
@IsOptional()
@IsString()
markdown?: string;

@ApiPropertyOptional()
@IsOptional()
@IsObject()
card?: Record<string, unknown>;

@ApiPropertyOptional()
@IsOptional()
@IsArray()
files?: FileRef[];
}

export class ResolveDto {
Expand Down Expand Up @@ -71,19 +119,21 @@ export class AgentReplyPayloadDto {
@IsNotEmpty()
integrationIdentifier: string;

@ApiPropertyOptional({ type: TextContentDto })
@ApiPropertyOptional({ type: ReplyContentDto })
@IsOptional()
@IsObject()
@ValidateNested()
@Type(() => TextContentDto)
reply?: TextContentDto;
@Validate(IsValidReplyContent)
@Type(() => ReplyContentDto)
reply?: ReplyContentDto;

@ApiPropertyOptional({ type: TextContentDto })
@ApiPropertyOptional({ type: ReplyContentDto })
@IsOptional()
@IsObject()
@ValidateNested()
@Type(() => TextContentDto)
update?: TextContentDto;
@Validate(IsValidReplyContent)
@Type(() => ReplyContentDto)
update?: ReplyContentDto;

@ApiPropertyOptional({ type: ResolveDto })
@IsOptional()
Expand Down
121 changes: 119 additions & 2 deletions apps/api/src/app/agents/e2e/mock-agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@
* 5. @mention the bot in Slack — watch the round-trip in the logs
*/

import { agent, Client, serve } from '@novu/framework/express';
import {
agent,
serve,
Client,
Actions,
Button,
Card,
CardLink,
CardText,
Divider,
Select,
SelectOption,
} from '@novu/framework/express';
import express from 'express';

const NOVU_SECRET_KEY = process.env.NOVU_SECRET_KEY;
Expand Down Expand Up @@ -47,9 +59,109 @@ const echoBot = agent('novu-agent', {
return;
}

if (userText.toLowerCase().includes('card')) {
await ctx.reply(
Card({
title: `Order #${Math.floor(Math.random() * 9000) + 1000}`,
children: [
CardText('Your order is ready for pickup.'),
Actions([
Button({ id: 'confirm', label: 'Confirm Pickup', style: 'primary' }),
Button({ id: 'cancel', label: 'Cancel Order', style: 'danger' }),
]),
],
})
);

return;
}

if (userText.toLowerCase().includes('incident')) {
await ctx.reply(
Card({
title: `Incident #${Math.floor(Math.random() * 9000) + 1000} — DB Latency Spike`,
children: [
CardText('*P1 — Production database latency spike*'),
CardText('Detected at 14:32 UTC. Response times exceeded 2s threshold for 3 minutes.'),
Divider(),
CardText('*Status:* Investigating | *Service:* payments-api | *Region:* us-east-1'),
Divider(),
Select({
id: 'assign',
label: 'Assign to on-call',
options: [
SelectOption({ value: 'alice', label: 'Alice Chen' }),
SelectOption({ value: 'bob', label: 'Bob Martinez' }),
SelectOption({ value: 'carol', label: 'Carol Wu' }),
],
}),
Actions([
Button({ id: 'ack', label: 'Acknowledge', style: 'primary' }),
Button({ id: 'escalate', label: 'Escalate', style: 'danger' }),
]),
CardLink({ url: 'https://grafana.example.com/d/abc', label: 'View Grafana Dashboard' }),
],
})
);

return;
}

if (userText.toLowerCase().includes('markdown')) {
await ctx.reply({
markdown: [
`**Echo:** ${userText}`,
'',
'| Metric | Value |',
'|--------|-------|',
'| Latency | 142ms |',
'| Throughput | 1.2k rps |',
'| Error rate | 0.02% |',
'',
'> Sent from _Novu Agent Framework_',
].join('\n'),
});

return;
}

await ctx.reply(`Echo: ${userText}`);
},

onAction: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] action: ${ctx.action?.actionId} = ${ctx.action?.value ?? '(no value)'}`);
console.log('─────────────────────────────────────────');

const actionId = ctx.action?.actionId ?? 'unknown';
const value = ctx.action?.value;

if (actionId === 'ack') {
await ctx.reply(
Card({
title: 'Incident Acknowledged',
children: [
CardText(
`Acknowledged by *${ctx.subscriber?.firstName ?? 'unknown'}* at ${new Date().toLocaleTimeString()}.`
),
Actions([Button({ id: 'resolve', label: 'Resolve Incident', style: 'primary' })]),
],
})
);
} else if (actionId === 'resolve') {
ctx.resolve('Incident resolved via action');
await ctx.reply(`Incident resolved by *${ctx.subscriber?.firstName ?? 'unknown'}*.`);
} else if (actionId === 'assign') {
await ctx.reply(`On-call assignment updated to *${value}*.`);
} else if (actionId === 'escalate') {
await ctx.reply({
markdown: `**Escalated** — paging the secondary on-call team.\n\n_Triggered by ${ctx.subscriber?.firstName ?? 'unknown'}_`,
});
} else {
await ctx.reply(`Got action: *${actionId}*${value ? ` = ${value}` : ''}`);
}
Comment on lines +139 to +162
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Handle the resolve action explicitly.

The ack card renders a resolve button, but onAction never matches it, so clicking Resolve Incident falls into the generic fallback instead of exercising the resolve path.

Proposed fix
     if (actionId === 'ack') {
       await ctx.reply(
         Card({
           title: 'Incident Acknowledged',
           children: [
             CardText(
               `Acknowledged by *${ctx.subscriber?.firstName ?? 'unknown'}* at ${new Date().toLocaleTimeString()}.`
             ),
             Actions([Button({ id: 'resolve', label: 'Resolve Incident', style: 'primary' })]),
           ],
         })
       );
+    } else if (actionId === 'resolve') {
+      ctx.resolve('Incident resolved via action');
+      await ctx.reply('Incident resolved.');
     } else if (actionId === 'assign') {
       await ctx.reply(`On-call assignment updated to *${value}*.`);
     } else if (actionId === 'escalate') {
       await ctx.reply({
         markdown: `**Escalated** — paging the secondary on-call team.\n\n_Triggered by ${ctx.subscriber?.firstName ?? 'unknown'}_`,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (actionId === 'ack') {
await ctx.reply(
Card({
title: 'Incident Acknowledged',
children: [
CardText(
`Acknowledged by *${ctx.subscriber?.firstName ?? 'unknown'}* at ${new Date().toLocaleTimeString()}.`
),
Actions([Button({ id: 'resolve', label: 'Resolve Incident', style: 'primary' })]),
],
})
);
} else if (actionId === 'assign') {
await ctx.reply(`On-call assignment updated to *${value}*.`);
} else if (actionId === 'escalate') {
await ctx.reply({
markdown: `**Escalated** — paging the secondary on-call team.\n\n_Triggered by ${ctx.subscriber?.firstName ?? 'unknown'}_`,
});
} else {
await ctx.reply(`Got action: *${actionId}*${value ? ` = ${value}` : ''}`);
}
if (actionId === 'ack') {
await ctx.reply(
Card({
title: 'Incident Acknowledged',
children: [
CardText(
`Acknowledged by *${ctx.subscriber?.firstName ?? 'unknown'}* at ${new Date().toLocaleTimeString()}.`
),
Actions([Button({ id: 'resolve', label: 'Resolve Incident', style: 'primary' })]),
],
})
);
} else if (actionId === 'resolve') {
ctx.resolve('Incident resolved via action');
await ctx.reply('Incident resolved.');
} else if (actionId === 'assign') {
await ctx.reply(`On-call assignment updated to *${value}*.`);
} else if (actionId === 'escalate') {
await ctx.reply({
markdown: `**Escalated** — paging the secondary on-call team.\n\n_Triggered by ${ctx.subscriber?.firstName ?? 'unknown'}_`,
});
} else {
await ctx.reply(`Got action: *${actionId}*${value ? ` = ${value}` : ''}`);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/app/agents/e2e/mock-agent-handler.ts` around lines 139 - 159,
The ack card includes a Button({ id: 'resolve', ... }) but the handler never
checks for actionId === 'resolve', so clicks fall to the fallback; add an
explicit branch checking actionId === 'resolve' (near the existing checks for
'ack', 'assign', 'escalate') and call ctx.reply with an appropriate response
(e.g., send a Card or markdown confirming "Incident Resolved" and who resolved
it using ctx.subscriber?.firstName and timestamp), ensuring the resolve action
path mirrors how 'ack' and 'escalate' use Card/markdown and Buttons.

},

onResolve: async (ctx) => {
console.log(`\n[onResolve] Conversation ${ctx.conversation.identifier} closed.`);
ctx.metadata.set('resolvedAt', new Date().toISOString());
Expand All @@ -70,7 +182,12 @@ app.use(
})
);

app.listen(PORT, () => {
const server = app.listen(PORT, () => {
console.log(`\nAgent Handler (using @novu/framework) running on http://localhost:${PORT}/api/novu`);
console.log('\nWaiting for bridge calls...\n');
});

server.on('error', (err) => console.error('Server error:', err));
server.on('close', () => console.log('Server closed'));
process.on('uncaughtException', (err) => console.error('Uncaught:', err));
process.on('unhandledRejection', (err) => console.error('Unhandled rejection:', err));
78 changes: 76 additions & 2 deletions apps/api/src/app/agents/services/agent-inbound-handler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { PinoLogger } from '@novu/application-generic';
import { ConversationActivitySenderTypeEnum, ConversationParticipantTypeEnum, SubscriberRepository } from '@novu/dal';
import type { Message, Thread } from 'chat';
import { AgentEventEnum } from '../dtos/agent-event.enum';
import { ResolvedPlatformConfig } from './agent-credential.service';
import { AgentConversationService } from './agent-conversation.service';
import { ResolvedPlatformConfig } from './agent-credential.service';
import { AgentSubscriberResolver } from './agent-subscriber-resolver.service';
import { BridgeExecutorService } from './bridge-executor.service';
import { type BridgeAction, BridgeExecutorService } from './bridge-executor.service';

@Injectable()
export class AgentInboundHandler {
Expand Down Expand Up @@ -107,4 +107,78 @@ export class AgentInboundHandler {
},
});
}

async handleAction(
agentId: string,
config: ResolvedPlatformConfig,
thread: Thread,
action: BridgeAction,
userId: string
): Promise<void> {
const subscriberId = await this.subscriberResolver
.resolve({
environmentId: config.environmentId,
organizationId: config.organizationId,
platform: config.platform,
platformUserId: userId,
integrationIdentifier: config.integrationIdentifier,
})
.catch((err) => {
this.logger.warn(
err,
`[agent:${agentId}] Subscriber resolution failed for action, continuing without subscriber`
);

return null;
});

const participantId = subscriberId ?? `${config.platform}:${userId}`;
const participantType = subscriberId
? ConversationParticipantTypeEnum.SUBSCRIBER
: ConversationParticipantTypeEnum.PLATFORM_USER;

const conversation = await this.conversationService.createOrGetConversation({
environmentId: config.environmentId,
organizationId: config.organizationId,
agentId,
platform: config.platform,
integrationId: config.integrationId,
platformThreadId: thread.id,
participantId,
participantType,
platformUserId: userId,
firstMessageText: `[action:${action.actionId}]`,
});

const serializedThread = thread.toJSON() as unknown as Record<string, unknown>;
await this.conversationService.updateChannelThread(
config.environmentId,
config.organizationId,
conversation._id,
thread.id,
serializedThread
);

const [subscriber, history] = await Promise.all([
subscriberId
? this.subscriberRepository.findBySubscriberId(config.environmentId, subscriberId)
: Promise.resolve(null),
this.conversationService.getHistory(config.environmentId, conversation._id),
]);

await this.bridgeExecutor.execute({
event: AgentEventEnum.ON_ACTION,
config,
conversation,
subscriber,
history,
message: null,
platformContext: {
threadId: thread.id,
channelId: thread.channelId,
isDM: thread.isDM,
},
action,
});
}
}
Loading
Loading