Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions apps/api/src/app/agents/dtos/agent-event.enum.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
export enum AgentEventEnum {
ON_START = 'onStart',
ON_MESSAGE = 'onMessage',
ON_ACTION = 'onAction',
ON_RESOLVE = 'onResolve',
}
77 changes: 77 additions & 0 deletions apps/api/src/app/agents/e2e/mock-agent-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Agent Handler — E2E test utility using @novu/framework
*
* This is a real serve() endpoint that uses the agent SDK to handle bridge calls.
* Run alongside the Novu API to test the full agent round-trip with Slack.
*
* Usage:
* NOVU_SECRET_KEY=<your-env-secret-key> npx ts-node apps/api/src/app/agents/e2e/mock-agent-handler.ts
*
* Setup:
* 1. Start Novu API: pnpm start:api:dev
* 2. Set environment bridge URL to http://localhost:4111/api/novu (dashboard or direct DB update)
* 3. Create an agent + link a Slack integration via the API
* 4. Point Slack event subscriptions to your Novu webhook URL (ngrok/tunnel)
* 5. @mention the bot in Slack — watch the round-trip in the logs
*/

import { agent, Client, serve } from '@novu/framework/express';
import express from 'express';

const NOVU_SECRET_KEY = process.env.NOVU_SECRET_KEY;
const PORT = Number(process.env.MOCK_PORT) || 4111;

if (!NOVU_SECRET_KEY) {
console.error('NOVU_SECRET_KEY is required. Set it to your environment secret key.');
process.exit(1);
}

const echoBot = agent('novu-agent', {
onMessage: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] from ${ctx.subscriber?.firstName ?? 'unknown'} on ${ctx.platform}`);
console.log(`Message: ${ctx.message?.text ?? '(none)'}`);
console.log(`Conversation: ${ctx.conversation.identifier} (${ctx.conversation.status})`);
console.log(`History: ${ctx.history.length} entries`);
console.log('─────────────────────────────────────────');

const userText = ctx.message?.text ?? '';
const turnCount = (ctx.conversation.metadata?.turnCount as number) ?? 0;

ctx.metadata.set('turnCount', turnCount + 1);

if (userText.toLowerCase().includes('done')) {
ctx.resolve(`Conversation resolved after ${turnCount + 1} turns`);
await ctx.reply('Thanks for chatting! Resolving this conversation.');

return;
}

await ctx.reply(`Echo: ${userText}`);
},

onResolve: async (ctx) => {
console.log(`\n[onResolve] Conversation ${ctx.conversation.identifier} closed.`);
ctx.metadata.set('resolvedAt', new Date().toISOString());
},
});

const app = express();
app.use(express.json());

app.use(
'/api/novu',
serve({
agents: [echoBot],
client: new Client({
secretKey: NOVU_SECRET_KEY,
strictAuthentication: false,
}),
})
);

app.listen(PORT, () => {
console.log(`\nAgent Handler (using @novu/framework) running on http://localhost:${PORT}/api/novu`);
console.log(`Secret Key: ${NOVU_SECRET_KEY.slice(0, 10)}...`);

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This logs sensitive data returned by
process environment
as clear text.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
console.log('\nWaiting for bridge calls...\n');
});
2 changes: 1 addition & 1 deletion apps/api/src/app/agents/services/chat-sdk.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ export class ChatSdkService implements OnModuleDestroy {
chat.onNewMention(async (thread: Thread, message: Message) => {
try {
await thread.subscribe();
await this.inboundHandler.handle(agentId, config, thread, message, AgentEventEnum.ON_START);
await this.inboundHandler.handle(agentId, config, thread, message, AgentEventEnum.ON_MESSAGE);
} catch (err) {
this.logger.error(err, `[agent:${agentId}] Error handling new mention`);
}
Expand Down
30 changes: 21 additions & 9 deletions packages/framework/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
StepNotFoundError,
WorkflowNotFoundError,
} from './errors';
import type { Agent } from './resources/agent';
import { mockSchema } from './jsonSchemaFaker';
import { prettyPrintDiscovery } from './resources/workflow/pretty-print-discovery';
import type {
Expand Down Expand Up @@ -52,6 +53,7 @@ function isRuntimeInDevelopment() {
export class Client {
private discoveredWorkflows = new Map<string, DiscoverWorkflowOutput>();
private discoverWorkflowPromises = new Map<string, Promise<void>>();
private registeredAgents = new Map<string, Agent>();

private templateEngine: Liquid;

Expand Down Expand Up @@ -128,6 +130,16 @@ export class Client {
}
}

public addAgents(agents: Array<Agent>): void {
for (const a of agents) {
this.registeredAgents.set(a.id, a);
}
}

public getAgent(agentId: string): Agent | undefined {
return this.registeredAgents.get(agentId);
}

private async addWorkflow(workflow: Workflow): Promise<void> {
try {
const definition = await workflow.discover();
Expand Down Expand Up @@ -402,12 +414,12 @@ export class Client {
}

public async executeWorkflow(event: Event): Promise<ExecuteOutput> {
const actionMessages = {
const actionMessages: Record<string, string> = {
[PostActionEnum.EXECUTE]: 'Executing',
[PostActionEnum.PREVIEW]: 'Previewing',
} as const;
};

const actionMessage = actionMessages[event.action];
const actionMessage = actionMessages[event.action] || event.action;

const actionMessageFormatted = `${actionMessage} workflowId:`;
this.log(`\n${log.bold(log.underline(actionMessageFormatted))} '${event.workflowId}'`);
Expand Down Expand Up @@ -495,11 +507,11 @@ export class Client {
const elapsedTimeInMilliseconds = elapsedSeconds * 1_000 + elapsedNanoseconds / 1_000_000;

const emoji = executionError ? EMOJI.ERROR : EMOJI.SUCCESS;
const resultMessages = {
const resultMessages: Record<string, string> = {
[PostActionEnum.EXECUTE]: 'Executed',
[PostActionEnum.PREVIEW]: 'Previewed',
} as const;
const resultMessage = resultMessages[event.action];
};
const resultMessage = resultMessages[event.action] || event.action;

this.log(`${emoji} ${resultMessage} workflowId: \`${event.workflowId}\``);

Expand Down Expand Up @@ -547,11 +559,11 @@ export class Client {
if (!this.verbose) return;

const successPrefix = error ? EMOJI.ERROR : EMOJI.SUCCESS;
const actionMessages = {
const actionMessages: Record<string, string> = {
[PostActionEnum.EXECUTE]: 'Executed',
[PostActionEnum.PREVIEW]: 'Previewed',
} as const;
const actionMessage = actionMessages[event.action];
};
const actionMessage = actionMessages[event.action] || event.action;
const message = error ? 'Failed to execute' : actionMessage;
const executionLog = error ? log.error : log.success;
const logMessage = `${successPrefix} ${message} workflowId: '${event.workflowId}`;
Expand Down
1 change: 1 addition & 0 deletions packages/framework/src/constants/action.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export enum PostActionEnum {
TRIGGER = 'trigger',
EXECUTE = 'execute',
PREVIEW = 'preview',
AGENT_EVENT = 'agent-event',
}

export enum GetActionEnum {
Expand Down
2 changes: 2 additions & 0 deletions packages/framework/src/constants/http-query.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ export enum HttpQueryKeysEnum {
STEP_ID = 'stepId',
ACTION = 'action',
SOURCE = 'source',
AGENT_ID = 'agentId',
EVENT = 'event',
}
48 changes: 43 additions & 5 deletions packages/framework/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ import {
SigningKeyNotFoundError,
} from './errors';
import { isPlatformError } from './errors/guard.errors';
import { AgentContextImpl, AgentEventEnum } from './resources/agent';
import type { Agent, AgentBridgeRequest } from './resources/agent';
import type { Awaitable, EventTriggerParams, Workflow } from './types';
import { createHmacSubtle, initApiClient } from './utils';

export type ServeHandlerOptions = {
client?: Client;
workflows: Array<Workflow>;
workflows?: Array<Workflow>;
agents?: Array<Agent>;
};

export type INovuRequestHandlerOptions<Input extends any[] = any[], Output = any> = ServeHandlerOptions & {
frameworkName: string;
client?: Client;
workflows: Array<Workflow>;
workflows?: Array<Workflow>;
agents?: Array<Agent>;
handler: Handler<Input, Output>;
};

Expand Down Expand Up @@ -62,14 +66,17 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
private readonly hmacEnabled: boolean;
private readonly http;
private readonly workflows: Array<Workflow>;
private readonly agents: Array<Agent>;

constructor(options: INovuRequestHandlerOptions<Input, Output>) {
this.handler = options.handler;
this.client = options.client ? options.client : new Client();
this.workflows = options.workflows;
this.workflows = options.workflows || [];
this.agents = options.agents || [];
this.http = initApiClient(this.client.secretKey, this.client.apiUrl);
this.frameworkName = options.frameworkName;
this.hmacEnabled = this.client.strictAuthentication;
this.client.addAgents(this.agents);
}

public createHandler(): (...args: Input) => Promise<Output> {
Expand Down Expand Up @@ -129,6 +136,8 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
const action = url.searchParams.get(HttpQueryKeysEnum.ACTION) || GetActionEnum.HEALTH_CHECK;
const workflowId = url.searchParams.get(HttpQueryKeysEnum.WORKFLOW_ID) || '';
const stepId = url.searchParams.get(HttpQueryKeysEnum.STEP_ID) || '';
const agentId = url.searchParams.get(HttpQueryKeysEnum.AGENT_ID) || '';
const agentEvent = url.searchParams.get(HttpQueryKeysEnum.EVENT) || '';
const signatureHeader = (await actions.headers(HttpHeaderKeysEnum.NOVU_SIGNATURE)) || '';

let body: Record<string, unknown> = {};
Expand All @@ -145,7 +154,7 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
await this.validateHmac(body, signatureHeader);
}

const postActionMap = this.getPostActionMap(body, workflowId, stepId, action);
const postActionMap = this.getPostActionMap(body, workflowId, stepId, action, agentId, agentEvent);
const getActionMap = this.getGetActionMap(workflowId, stepId);

if (method === HttpMethodEnum.POST) {
Expand All @@ -171,7 +180,9 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
body: any,
workflowId: string,
stepId: string,
action: string
action: string,
agentId: string,
agentEvent: string
): Record<PostActionEnum, () => Promise<IActionResponse>> {
return {
[PostActionEnum.TRIGGER]: this.triggerAction({ workflowId, ...body }),
Expand All @@ -195,6 +206,21 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {

return this.createResponse(HttpStatusEnum.OK, result);
},
[PostActionEnum.AGENT_EVENT]: async () => {
const registeredAgent = this.client.getAgent(agentId);

if (!registeredAgent) {
return this.createResponse(HttpStatusEnum.NOT_FOUND, { error: `Agent '${agentId}' not registered` });
}

const ctx = new AgentContextImpl(body as AgentBridgeRequest, this.client.secretKey);

this.runAgentHandler(registeredAgent, agentEvent, ctx).catch((err) => {
console.error(`[agent:${agentId}] Handler error:`, err);
});

return this.createResponse(HttpStatusEnum.OK, { status: 'ack' });
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
},
};
}

Expand Down Expand Up @@ -264,6 +290,18 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
}
}

private async runAgentHandler(registeredAgent: Agent, event: string, ctx: AgentContextImpl): Promise<void> {
if (event === AgentEventEnum.ON_RESOLVE) {
if (registeredAgent.handlers.onResolve) {
await registeredAgent.handlers.onResolve(ctx);
}
} else {
await registeredAgent.handlers.onMessage(ctx);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

await ctx.flush();
}

private handleError(error: unknown): IActionResponse {
if (isFrameworkError(error)) {
if (error.statusCode >= 500) {
Expand Down
7 changes: 6 additions & 1 deletion packages/framework/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
export { Client } from './client';
export { CronExpression } from './constants';
export { NovuRequestHandler, type ServeHandlerOptions } from './handler';
export { workflow } from './resources';
export { agent, workflow } from './resources';
export type {
Agent,
AgentContext,
AgentHandlers,
} from './resources';
export type {
AnyStepResolver,
ChatStepResolver,
Expand Down
Loading
Loading