Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/src/app/agents/dtos/agent-event.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export enum AgentEventEnum {
ON_MESSAGE = 'onMessage',
ON_ACTION = 'onAction',
ON_RESOLVE = 'onResolve',
ON_REACTION = 'onReaction',
}
117 changes: 116 additions & 1 deletion apps/api/src/app/agents/e2e/agent-webhook.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { testServer } from '@novu/testing';
import { expect } from 'chai';
import sinon from 'sinon';
import { AgentInboundHandler } from '../services/agent-inbound-handler.service';
import { AgentInboundHandler, InboundReactionEvent } from '../services/agent-inbound-handler.service';
import { BridgeExecutorService, BridgeExecutorParams } from '../services/bridge-executor.service';
import { AgentConfigResolver } from '../services/agent-config-resolver.service';
import { AgentEventEnum } from '../dtos/agent-event.enum';
Expand Down Expand Up @@ -359,4 +359,119 @@ describe('Agent Webhook - inbound flow #novu-v2', () => {
expect(remainingPlatformUsers.length).to.equal(0);
});
});

describe('Reaction handling', () => {
async function invokeReaction(threadId: string, reaction: InboundReactionEvent) {
const config = await configResolver.resolve(ctx.agentId, ctx.integrationIdentifier);
await inboundHandler.handleReaction(ctx.agentId, config, reaction);
}

it('should fire ON_REACTION bridge call for an existing conversation', async () => {
const threadId = `T_REACT_${Date.now()}`;
const msg = mockMessage({ userId: 'U_REACT', text: 'React to this' });

await invokeInbound(threadId, msg);
bridgeCalls = [];

const reactionEvent: InboundReactionEvent = {
emoji: { name: 'thumbs_up' },
added: true,
messageId: msg.id,
message: msg as any,
thread: mockThread(threadId) as any,
};

await invokeReaction(threadId, reactionEvent);

expect(bridgeCalls.length).to.equal(1);
const call = bridgeCalls[0];
expect(call.event).to.equal(AgentEventEnum.ON_REACTION);
expect(call.reaction).to.exist;
expect(call.reaction!.emoji).to.equal('thumbs_up');
expect(call.reaction!.added).to.equal(true);
expect(call.reaction!.messageId).to.equal(msg.id);
});

it('should skip reaction when no conversation exists for the thread', async () => {
const reactionEvent: InboundReactionEvent = {
emoji: { name: 'wave' },
added: true,
messageId: 'msg-orphan',
thread: mockThread(`T_NOCONV_${Date.now()}`) as any,
};

await invokeReaction('ignored', reactionEvent);

expect(bridgeCalls.length).to.equal(0);
});

it('should skip reaction when thread context is missing', async () => {
const reactionEvent: InboundReactionEvent = {
emoji: { name: 'fire' },
added: false,
messageId: 'msg-no-thread',
};

await invokeReaction('ignored', reactionEvent);

expect(bridgeCalls.length).to.equal(0);
});

it('should include sourceMessage in reaction bridge call', async () => {
const threadId = `T_REACT_MSG_${Date.now()}`;
const msg = mockMessage({ userId: 'U_REACT_MSG', text: 'Source message test', fullName: 'Jane Doe' });

await invokeInbound(threadId, msg);
bridgeCalls = [];

const reactionEvent: InboundReactionEvent = {
emoji: { name: 'tada' },
added: true,
messageId: msg.id,
message: msg as any,
thread: mockThread(threadId) as any,
};

await invokeReaction(threadId, reactionEvent);

expect(bridgeCalls.length).to.equal(1);
const call = bridgeCalls[0];
expect(call.reaction!.sourceMessage).to.exist;
expect(call.reaction!.sourceMessage!.text).to.equal('Source message test');
expect(call.reaction!.sourceMessage!.author.fullName).to.equal('Jane Doe');
});

it('should not persist conversation activity for reactions', async () => {
const threadId = `T_REACT_NOACT_${Date.now()}`;
const msg = mockMessage({ userId: 'U_REACT2', text: 'Activity test' });

await invokeInbound(threadId, msg);

const conversation = await conversationRepository.findByPlatformThread(
ctx.session.environment._id,
ctx.session.organization._id,
threadId
);
const activitiesBefore = await activityRepository.findByConversation(
ctx.session.environment._id,
conversation!._id
);

const reactionEvent: InboundReactionEvent = {
emoji: { name: 'heart' },
added: true,
messageId: msg.id,
message: msg as any,
thread: mockThread(threadId) as any,
};

await invokeReaction(threadId, reactionEvent);

const activitiesAfter = await activityRepository.findByConversation(
ctx.session.environment._id,
conversation!._id
);
expect(activitiesAfter.length).to.equal(activitiesBefore.length);
});
});
});
12 changes: 12 additions & 0 deletions apps/api/src/app/agents/e2e/mock-agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ const echoBot = agent('novu-agent', {
await ctx.reply(`Echo: ${userText}`);
},

onReaction: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] reaction: ${ctx.reaction?.emoji.name} (${ctx.reaction?.added ? 'added' : 'removed'})`);
console.log(`Reacted message: ${ctx.reaction?.message?.text ?? '(unavailable)'}`);
console.log('─────────────────────────────────────────');

const emoji = ctx.reaction?.emoji.name ?? 'unknown';
const added = ctx.reaction?.added ?? false;

await ctx.reply(`Got ${added ? '' : 'un'}reaction: :${emoji}:`);
},

onAction: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] action: ${ctx.action?.actionId} = ${ctx.action?.value ?? '(no value)'}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@ import { HandleAgentReply } from '../usecases/handle-agent-reply/handle-agent-re
import { ResolvedAgentConfig } from './agent-config-resolver.service';
import { AgentConversationService } from './agent-conversation.service';
import { AgentSubscriberResolver } from './agent-subscriber-resolver.service';
import { type BridgeAction, BridgeExecutorService, NoBridgeUrlError } from './bridge-executor.service';
import { type BridgeAction, type BridgeReaction, BridgeExecutorService, NoBridgeUrlError } from './bridge-executor.service';

const ONBOARDING_NO_BRIDGE_REPLY_MARKDOWN = `*You're connected to Novu*

Your bot is linked successfully. Go back to the *Novu dashboard* to complete onboarding.`;

export interface InboundReactionEvent {
emoji: { name: string };
added: boolean;
messageId: string;
message?: Message;
thread?: Thread;
user?: { userId: string; fullName?: string; userName?: string };
}

@Injectable()
export class AgentInboundHandler {
constructor(
Expand Down Expand Up @@ -154,6 +163,76 @@ export class AgentInboundHandler {
}
}

async handleReaction(
agentId: string,
config: ResolvedAgentConfig,
event: InboundReactionEvent
): Promise<void> {
const threadId = event.thread?.id;
if (!threadId) {
this.logger.warn(`[agent:${agentId}] Reaction received without thread context, skipping`);

return;
}

const conversation = await this.conversationRepository.findByPlatformThread(
config.environmentId,
config.organizationId,
threadId
);

if (!conversation) {
return;
Comment on lines +184 to +185
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a blank line before return in this block.

This block violates the TS/JS style rule used in this repo.

Proposed fix
     if (!conversation) {
+
       return;
     }

As per coding guidelines: **/*.{ts,tsx,js,jsx} — “Include a blank line before every return statement”.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (!conversation) {
return;
if (!conversation) {
return;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/app/agents/services/agent-inbound-handler.service.ts` around
lines 183 - 184, In the if (!conversation) guard inside
AgentInboundHandlerService (the method handling inbound messages), add a single
blank line immediately before the return statement to satisfy the repository
style rule requiring a blank line before every return; locate the block that
checks "if (!conversation) { return; }" and change it so there's an empty line
between the opening brace and the return.

}

const platformUserId = event.user?.userId;

const subscriberId = platformUserId
? await this.subscriberResolver
.resolve({
environmentId: config.environmentId,
organizationId: config.organizationId,
platform: config.platform,
platformUserId,
integrationIdentifier: config.integrationIdentifier,
})
.catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Subscriber resolution failed for reaction, continuing without subscriber`);

return null;
})
: null;

const [subscriber, history] = await Promise.all([
subscriberId
? this.subscriberRepository.findBySubscriberId(config.environmentId, subscriberId)
: Promise.resolve(null),
this.conversationService.getHistory(config.environmentId, conversation._id),
]);

const reaction: BridgeReaction = {
emoji: event.emoji.name,
added: event.added,
messageId: event.messageId,
sourceMessage: event.message,
};

await this.bridgeExecutor.execute({
event: AgentEventEnum.ON_REACTION,
config,
conversation,
subscriber,
history,
message: null,
platformContext: {
threadId,
channelId: event.thread?.channelId ?? '',
isDM: event.thread?.isDM ?? false,
},
reaction,
});
}

async handleAction(
agentId: string,
config: ResolvedAgentConfig,
Expand Down
34 changes: 32 additions & 2 deletions apps/api/src/app/agents/services/bridge-executor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ export interface BridgePlatformContext {
isDM: boolean;
}

export interface BridgeReaction {
emoji: string;
added: boolean;
messageId: string;
sourceMessage?: Message;
}

export interface BridgeExecutorParams {
event: AgentEventEnum;
config: ResolvedAgentConfig;
Expand All @@ -35,6 +42,7 @@ export interface BridgeExecutorParams {
message: Message | null;
platformContext: BridgePlatformContext;
action?: BridgeAction;
reaction?: BridgeReaction;
}

interface BridgeMessageAuthor {
Expand Down Expand Up @@ -85,6 +93,13 @@ interface BridgeHistoryEntry {
createdAt: string;
}

interface BridgeReactionPayload {
messageId: string;
emoji: { name: string };
added: boolean;
message: BridgeMessage | null;
}

export interface AgentBridgeRequest {
version: 1;
timestamp: string;
Expand All @@ -101,6 +116,7 @@ export interface AgentBridgeRequest {
platform: string;
platformContext: BridgePlatformContext;
action: BridgeAction | null;
reaction: BridgeReactionPayload | null;
}

export class NoBridgeUrlError extends Error {
Expand Down Expand Up @@ -220,7 +236,7 @@ export class BridgeExecutorService {
}

private buildPayload(params: BridgeExecutorParams): AgentBridgeRequest {
const { event, config, conversation, subscriber, history, message, platformContext, action } = params;
const { event, config, conversation, subscriber, history, message, platformContext, action, reaction } = params;
const agentIdentifier = config.agentIdentifier;

const apiRootUrl = process.env.API_ROOT_URL || 'http://localhost:3000';
Expand All @@ -233,11 +249,13 @@ export class BridgeExecutorService {
deliveryId = `${conversation._id}:${message.id}`;
} else if (action) {
deliveryId = `${conversation._id}:${event}:${action.actionId}:${timestamp}`;
} else if (reaction) {
deliveryId = `${conversation._id}:${event}:${reaction.messageId}:${timestamp}`;
} else {
deliveryId = `${conversation._id}:${event}`;
}

return {
const payload: AgentBridgeRequest = {
version: 1,
timestamp,
deliveryId,
Expand All @@ -253,7 +271,10 @@ export class BridgeExecutorService {
platform: config.platform,
platformContext,
action: action ?? null,
reaction: reaction ? this.mapReaction(reaction) : null,
};

return payload;
}

private mapMessage(message: Message): BridgeMessage {
Expand Down Expand Up @@ -298,6 +319,15 @@ export class BridgeExecutorService {
};
}

private mapReaction(reaction: BridgeReaction): BridgeReactionPayload {
return {
messageId: reaction.messageId,
emoji: { name: reaction.emoji },
added: reaction.added,
message: reaction.sourceMessage ? this.mapMessage(reaction.sourceMessage) : null,
};
}

private mapHistory(activities: ConversationActivityEntity[]): BridgeHistoryEntry[] {
return [...activities].reverse().map((activity) => ({
role: activity.senderType,
Expand Down
15 changes: 15 additions & 0 deletions apps/api/src/app/agents/services/chat-sdk.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,5 +295,20 @@ export class ChatSdkService implements OnModuleDestroy {
this.logger.error(err, `[agent:${agentId}] Error handling action ${event.actionId}`);
}
});

chat.onReaction(async (event: any) => {
try {
await this.inboundHandler.handleReaction(agentId, config, {
emoji: event.emoji,
added: event.added,
messageId: event.messageId,
message: event.message,
thread: event.thread,
user: event.user,
});
Comment on lines +299 to +308
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Skip malformed reaction events before forwarding.

Unlike the action path on Line 284, this branch forwards thread and user without checking them first. If an adapter emits a partial reaction event, this becomes a logged runtime error instead of a clean skip.

Suggested guard
     chat.onReaction(async (event: any) => {
       try {
+        if (!event.thread || !event.user) {
+          this.logger.warn(`[agent:${agentId}] Reaction received without complete context, skipping`);
+
+          return;
+        }
+
         await this.inboundHandler.handleReaction(agentId, config, {
           emoji: event.emoji,
           added: event.added,
           messageId: event.messageId,
           message: event.message,

As per coding guidelines, apps/api/**: Review with focus on security, authentication, and authorization. Check for proper error handling and input validation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
chat.onReaction(async (event: any) => {
try {
await this.inboundHandler.handleReaction(agentId, config, {
emoji: event.emoji,
added: event.added,
messageId: event.messageId,
message: event.message,
thread: event.thread,
user: event.user,
});
chat.onReaction(async (event: any) => {
try {
if (!event.thread || !event.user) {
this.logger.warn(`[agent:${agentId}] Reaction received without complete context, skipping`);
return;
}
await this.inboundHandler.handleReaction(agentId, config, {
emoji: event.emoji,
added: event.added,
messageId: event.messageId,
message: event.message,
thread: event.thread,
user: event.user,
});
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/api/src/app/agents/services/chat-sdk.service.ts` around lines 299 - 308,
The chat.onReaction handler currently forwards event.thread and event.user
without validation causing runtime errors for partial/malformed reactions;
update the chat.onReaction callback to validate presence and shape of
event.thread and event.user (and any other required fields like event.messageId,
event.emoji) and skip/return early for malformed events instead of calling
this.inboundHandler.handleReaction(agentId, config, ...); ensure the guard logic
mirrors the action-path checks (validate thread and user exist and are
well-formed) before invoking inboundHandler.handleReaction so only complete
reaction events are forwarded.

} catch (err) {
this.logger.error(err, `[agent:${agentId}] Error handling reaction`);
}
});
}
}
Loading
Loading