Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/src/app/agents/dtos/agent-event.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export enum AgentEventEnum {
ON_MESSAGE = 'onMessage',
ON_ACTION = 'onAction',
ON_RESOLVE = 'onResolve',
ON_REACTION = 'onReaction',
}
117 changes: 116 additions & 1 deletion apps/api/src/app/agents/e2e/agent-webhook.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { testServer } from '@novu/testing';
import { expect } from 'chai';
import sinon from 'sinon';
import { AgentInboundHandler } from '../services/agent-inbound-handler.service';
import { AgentInboundHandler, InboundReactionEvent } from '../services/agent-inbound-handler.service';
import { BridgeExecutorService, BridgeExecutorParams } from '../services/bridge-executor.service';
import { AgentConfigResolver } from '../services/agent-config-resolver.service';
import { AgentEventEnum } from '../dtos/agent-event.enum';
Expand Down Expand Up @@ -359,4 +359,119 @@ describe('Agent Webhook - inbound flow #novu-v2', () => {
expect(remainingPlatformUsers.length).to.equal(0);
});
});

describe('Reaction handling', () => {
async function invokeReaction(threadId: string, reaction: InboundReactionEvent) {
const config = await configResolver.resolve(ctx.agentId, ctx.integrationIdentifier);
await inboundHandler.handleReaction(ctx.agentId, config, reaction);
}

it('should fire ON_REACTION bridge call for an existing conversation', async () => {
const threadId = `T_REACT_${Date.now()}`;
const msg = mockMessage({ userId: 'U_REACT', text: 'React to this' });

await invokeInbound(threadId, msg);
bridgeCalls = [];

const reactionEvent: InboundReactionEvent = {
emoji: { name: 'thumbs_up' },
added: true,
messageId: msg.id,
message: msg as any,
thread: mockThread(threadId) as any,
};

await invokeReaction(threadId, reactionEvent);

expect(bridgeCalls.length).to.equal(1);
const call = bridgeCalls[0];
expect(call.event).to.equal(AgentEventEnum.ON_REACTION);
expect(call.reaction).to.exist;
expect(call.reaction!.emoji).to.equal('thumbs_up');
expect(call.reaction!.added).to.equal(true);
expect(call.reaction!.messageId).to.equal(msg.id);
});

it('should skip reaction when no conversation exists for the thread', async () => {
const reactionEvent: InboundReactionEvent = {
emoji: { name: 'wave' },
added: true,
messageId: 'msg-orphan',
thread: mockThread(`T_NOCONV_${Date.now()}`) as any,
};

await invokeReaction('ignored', reactionEvent);

expect(bridgeCalls.length).to.equal(0);
});

it('should skip reaction when thread context is missing', async () => {
const reactionEvent: InboundReactionEvent = {
emoji: { name: 'fire' },
added: false,
messageId: 'msg-no-thread',
};

await invokeReaction('ignored', reactionEvent);

expect(bridgeCalls.length).to.equal(0);
});

it('should include sourceMessage in reaction bridge call', async () => {
const threadId = `T_REACT_MSG_${Date.now()}`;
const msg = mockMessage({ userId: 'U_REACT_MSG', text: 'Source message test', fullName: 'Jane Doe' });

await invokeInbound(threadId, msg);
bridgeCalls = [];

const reactionEvent: InboundReactionEvent = {
emoji: { name: 'tada' },
added: true,
messageId: msg.id,
message: msg as any,
thread: mockThread(threadId) as any,
};

await invokeReaction(threadId, reactionEvent);

expect(bridgeCalls.length).to.equal(1);
const call = bridgeCalls[0];
expect(call.reaction!.sourceMessage).to.exist;
expect(call.reaction!.sourceMessage!.text).to.equal('Source message test');
expect(call.reaction!.sourceMessage!.author.fullName).to.equal('Jane Doe');
});

it('should not persist conversation activity for reactions', async () => {
const threadId = `T_REACT_NOACT_${Date.now()}`;
const msg = mockMessage({ userId: 'U_REACT2', text: 'Activity test' });

await invokeInbound(threadId, msg);

const conversation = await conversationRepository.findByPlatformThread(
ctx.session.environment._id,
ctx.session.organization._id,
threadId
);
const activitiesBefore = await activityRepository.findByConversation(
ctx.session.environment._id,
conversation!._id
);

const reactionEvent: InboundReactionEvent = {
emoji: { name: 'heart' },
added: true,
messageId: msg.id,
message: msg as any,
thread: mockThread(threadId) as any,
};

await invokeReaction(threadId, reactionEvent);

const activitiesAfter = await activityRepository.findByConversation(
ctx.session.environment._id,
conversation!._id
);
expect(activitiesAfter.length).to.equal(activitiesBefore.length);
});
});
});
12 changes: 12 additions & 0 deletions apps/api/src/app/agents/e2e/mock-agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ const echoBot = agent('novu-agent', {
await ctx.reply(`Echo: ${userText}`);
},

onReaction: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] reaction: ${ctx.reaction?.emoji.name} (${ctx.reaction?.added ? 'added' : 'removed'})`);
console.log(`Reacted message: ${ctx.reaction?.message?.text ?? '(unavailable)'}`);
console.log('─────────────────────────────────────────');

const emoji = ctx.reaction?.emoji.name ?? 'unknown';
const added = ctx.reaction?.added ?? false;

await ctx.reply(`Got ${added ? '' : 'un'}reaction: :${emoji}:`);
},

onAction: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] action: ${ctx.action?.actionId} = ${ctx.action?.value ?? '(no value)'}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@ import { HandleAgentReply } from '../usecases/handle-agent-reply/handle-agent-re
import { ResolvedAgentConfig } from './agent-config-resolver.service';
import { AgentConversationService } from './agent-conversation.service';
import { AgentSubscriberResolver } from './agent-subscriber-resolver.service';
import { type BridgeAction, BridgeExecutorService, NoBridgeUrlError } from './bridge-executor.service';
import { type BridgeAction, type BridgeReaction, BridgeExecutorService, NoBridgeUrlError } from './bridge-executor.service';

const ONBOARDING_NO_BRIDGE_REPLY_MARKDOWN = `*You're connected to Novu*

Your bot is linked successfully. Go back to the *Novu dashboard* to complete onboarding.`;

export interface InboundReactionEvent {
emoji: { name: string };
added: boolean;
messageId: string;
message?: Message;
thread?: Thread;
}

@Injectable()
export class AgentInboundHandler {
constructor(
Expand Down Expand Up @@ -154,6 +162,76 @@ export class AgentInboundHandler {
}
}

async handleReaction(
agentId: string,
config: ResolvedAgentConfig,
event: InboundReactionEvent
): Promise<void> {
const threadId = event.thread?.id;
if (!threadId) {
this.logger.warn(`[agent:${agentId}] Reaction received without thread context, skipping`);

return;
}

const conversation = await this.conversationRepository.findByPlatformThread(
config.environmentId,
config.organizationId,
threadId
);

if (!conversation) {
return;
Comment on lines +184 to +185
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a blank line before return in this block.

This block violates the TS/JS style rule used in this repo.

Proposed fix
     if (!conversation) {
+
       return;
     }

As per coding guidelines: **/*.{ts,tsx,js,jsx} — “Include a blank line before every return statement”.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!conversation) {
return;
if (!conversation) {
return;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/app/agents/services/agent-inbound-handler.service.ts` around
lines 183 - 184, In the if (!conversation) guard inside
AgentInboundHandlerService (the method handling inbound messages), add a single
blank line immediately before the return statement to satisfy the repository
style rule requiring a blank line before every return; locate the block that
checks "if (!conversation) { return; }" and change it so there's an empty line
between the opening brace and the return.

}

const platformUserId = event.message?.author?.userId;

const subscriberId = platformUserId
? await this.subscriberResolver
.resolve({
environmentId: config.environmentId,
organizationId: config.organizationId,
platform: config.platform,
platformUserId,
integrationIdentifier: config.integrationIdentifier,
})
.catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Subscriber resolution failed for reaction, continuing without subscriber`);

return null;
})
: null;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

const [subscriber, history] = await Promise.all([
subscriberId
? this.subscriberRepository.findBySubscriberId(config.environmentId, subscriberId)
: Promise.resolve(null),
this.conversationService.getHistory(config.environmentId, conversation._id),
]);

const reaction: BridgeReaction = {
emoji: event.emoji.name,
added: event.added,
messageId: event.messageId,
sourceMessage: event.message,
};

await this.bridgeExecutor.execute({
event: AgentEventEnum.ON_REACTION,
config,
conversation,
subscriber,
history,
message: null,
platformContext: {
threadId,
channelId: event.thread?.channelId ?? '',
isDM: event.thread?.isDM ?? false,
},
reaction,
});
}

async handleAction(
agentId: string,
config: ResolvedAgentConfig,
Expand Down
32 changes: 30 additions & 2 deletions apps/api/src/app/agents/services/bridge-executor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ export interface BridgePlatformContext {
isDM: boolean;
}

export interface BridgeReaction {
emoji: string;
added: boolean;
messageId: string;
sourceMessage?: Message;
}

export interface BridgeExecutorParams {
event: AgentEventEnum;
config: ResolvedAgentConfig;
Expand All @@ -35,6 +42,7 @@ export interface BridgeExecutorParams {
message: Message | null;
platformContext: BridgePlatformContext;
action?: BridgeAction;
reaction?: BridgeReaction;
}

interface BridgeMessageAuthor {
Expand Down Expand Up @@ -85,6 +93,12 @@ interface BridgeHistoryEntry {
createdAt: string;
}

interface BridgeReactionPayload {
emoji: { name: string };
added: boolean;
message: BridgeMessage | null;
}

export interface AgentBridgeRequest {
version: 1;
timestamp: string;
Expand All @@ -101,6 +115,7 @@ export interface AgentBridgeRequest {
platform: string;
platformContext: BridgePlatformContext;
action: BridgeAction | null;
reaction: BridgeReactionPayload | null;
}

export class NoBridgeUrlError extends Error {
Expand Down Expand Up @@ -220,7 +235,7 @@ export class BridgeExecutorService {
}

private buildPayload(params: BridgeExecutorParams): AgentBridgeRequest {
const { event, config, conversation, subscriber, history, message, platformContext, action } = params;
const { event, config, conversation, subscriber, history, message, platformContext, action, reaction } = params;
const agentIdentifier = config.agentIdentifier;

const apiRootUrl = process.env.API_ROOT_URL || 'http://localhost:3000';
Expand All @@ -233,11 +248,13 @@ export class BridgeExecutorService {
deliveryId = `${conversation._id}:${message.id}`;
} else if (action) {
deliveryId = `${conversation._id}:${event}:${action.actionId}:${timestamp}`;
} else if (reaction) {
deliveryId = `${conversation._id}:${event}:${reaction.messageId}:${timestamp}`;
} else {
deliveryId = `${conversation._id}:${event}`;
}

return {
const payload: AgentBridgeRequest = {
version: 1,
timestamp,
deliveryId,
Expand All @@ -253,7 +270,10 @@ export class BridgeExecutorService {
platform: config.platform,
platformContext,
action: action ?? null,
reaction: reaction ? this.mapReaction(reaction) : null,
};

return payload;
}

private mapMessage(message: Message): BridgeMessage {
Expand Down Expand Up @@ -298,6 +318,14 @@ export class BridgeExecutorService {
};
}

private mapReaction(reaction: BridgeReaction): BridgeReactionPayload {
return {
emoji: { name: reaction.emoji },
added: reaction.added,
message: reaction.sourceMessage ? this.mapMessage(reaction.sourceMessage) : null,
};
}

private mapHistory(activities: ConversationActivityEntity[]): BridgeHistoryEntry[] {
return [...activities].reverse().map((activity) => ({
role: activity.senderType,
Expand Down
14 changes: 14 additions & 0 deletions apps/api/src/app/agents/services/chat-sdk.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,5 +295,19 @@ export class ChatSdkService implements OnModuleDestroy {
this.logger.error(err, `[agent:${agentId}] Error handling action ${event.actionId}`);
}
});

chat.onReaction(async (event: any) => {
try {
await this.inboundHandler.handleReaction(agentId, config, {
emoji: event.emoji,
added: event.added,
messageId: event.messageId,
message: event.message,
thread: event.thread,
});
Comment on lines +299 to +308
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Skip malformed reaction events before forwarding.

Unlike the action path on Line 284, this branch forwards thread and user without checking them first. If an adapter emits a partial reaction event, this becomes a logged runtime error instead of a clean skip.

Suggested guard
     chat.onReaction(async (event: any) => {
       try {
+        if (!event.thread || !event.user) {
+          this.logger.warn(`[agent:${agentId}] Reaction received without complete context, skipping`);
+
+          return;
+        }
+
         await this.inboundHandler.handleReaction(agentId, config, {
           emoji: event.emoji,
           added: event.added,
           messageId: event.messageId,
           message: event.message,

As per coding guidelines, apps/api/**: Review with focus on security, authentication, and authorization. Check for proper error handling and input validation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
chat.onReaction(async (event: any) => {
try {
await this.inboundHandler.handleReaction(agentId, config, {
emoji: event.emoji,
added: event.added,
messageId: event.messageId,
message: event.message,
thread: event.thread,
user: event.user,
});
chat.onReaction(async (event: any) => {
try {
if (!event.thread || !event.user) {
this.logger.warn(`[agent:${agentId}] Reaction received without complete context, skipping`);
return;
}
await this.inboundHandler.handleReaction(agentId, config, {
emoji: event.emoji,
added: event.added,
messageId: event.messageId,
message: event.message,
thread: event.thread,
user: event.user,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/app/agents/services/chat-sdk.service.ts` around lines 299 - 308,
The chat.onReaction handler currently forwards event.thread and event.user
without validation causing runtime errors for partial/malformed reactions;
update the chat.onReaction callback to validate presence and shape of
event.thread and event.user (and any other required fields like event.messageId,
event.emoji) and skip/return early for malformed events instead of calling
this.inboundHandler.handleReaction(agentId, config, ...); ensure the guard logic
mirrors the action-path checks (validate thread and user exist and are
well-formed) before invoking inboundHandler.handleReaction so only complete
reaction events are forwarded.

} catch (err) {
this.logger.error(err, `[agent:${agentId}] Error handling reaction`);
}
});
}
}
24 changes: 13 additions & 11 deletions packages/framework/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,20 +297,22 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
}

private async runAgentHandler(registeredAgent: Agent, event: string, ctx: AgentContextImpl): Promise<void> {
if (event === AgentEventEnum.ON_RESOLVE) {
if (registeredAgent.handlers.onResolve) {
await registeredAgent.handlers.onResolve(ctx);
}
} else if (event === AgentEventEnum.ON_ACTION) {
if (registeredAgent.handlers.onAction) {
await registeredAgent.handlers.onAction(ctx);
}
} else if (event === AgentEventEnum.ON_MESSAGE) {
await registeredAgent.handlers.onMessage(ctx);
} else {
const handlerMap: Record<string, ((ctx: AgentContextImpl) => Promise<void>) | undefined> = {
[AgentEventEnum.ON_MESSAGE]: registeredAgent.handlers.onMessage,
[AgentEventEnum.ON_REACTION]: registeredAgent.handlers.onReaction,
[AgentEventEnum.ON_ACTION]: registeredAgent.handlers.onAction,
[AgentEventEnum.ON_RESOLVE]: registeredAgent.handlers.onResolve,
};

if (!(event in handlerMap)) {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
throw new InvalidActionError(event, AgentEventEnum);
}

const handler = handlerMap[event];
if (handler) {
await handler(ctx);
}

await ctx.flush();
}

Expand Down
Loading
Loading