Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions apps/api/src/app/agents/dtos/agent-event.enum.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
export enum AgentEventEnum {
ON_START = 'onStart',
ON_MESSAGE = 'onMessage',
ON_ACTION = 'onAction',
ON_RESOLVE = 'onResolve',
}
4 changes: 2 additions & 2 deletions apps/api/src/app/agents/e2e/agent-webhook.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('Agent Webhook - inbound flow #novu-v2', () => {
});
});

async function invokeInbound(threadId: string, message: ReturnType<typeof mockMessage>, event = AgentEventEnum.ON_START) {
async function invokeInbound(threadId: string, message: ReturnType<typeof mockMessage>, event = AgentEventEnum.ON_MESSAGE) {
Comment thread
coderabbitai[bot] marked this conversation as resolved.
const config = await credentialService.resolve(ctx.agentId, ctx.integrationIdentifier);
const thread = mockThread(threadId);
await inboundHandler.handle(ctx.agentId, config, thread as any, message as any, event);
Expand Down Expand Up @@ -220,7 +220,7 @@ describe('Agent Webhook - inbound flow #novu-v2', () => {
expect(bridgeCalls.length).to.equal(1);
const call = bridgeCalls[0];

expect(call.event).to.equal(AgentEventEnum.ON_START);
expect(call.event).to.equal(AgentEventEnum.ON_MESSAGE);
expect(call.config.agentIdentifier).to.equal(ctx.agentIdentifier);
expect(call.config.integrationIdentifier).to.equal(ctx.integrationIdentifier);
expect(call.config.platform).to.equal('slack');
Expand Down
2 changes: 1 addition & 1 deletion apps/api/src/app/agents/e2e/helpers/agent-test-setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export async function setupAgentTestContext(): Promise<AgentTestContext> {
_organizationId: session.organization._id,
providerId: ChatProviderIdEnum.Slack,
channel: ChannelTypeEnum.CHAT,
credentials: encryptCredentials({ apiKey: SIGNING_SECRET }),
credentials: encryptCredentials({ signingSecret: SIGNING_SECRET }),
active: true,
name: 'Slack Agent E2E',
identifier: `slack-agent-e2e-${Date.now()}`,
Expand Down
76 changes: 76 additions & 0 deletions apps/api/src/app/agents/e2e/mock-agent-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Agent Handler — E2E test utility using @novu/framework
*
* This is a real serve() endpoint that uses the agent SDK to handle bridge calls.
* Run alongside the Novu API to test the full agent round-trip with Slack.
*
* Usage:
* NOVU_SECRET_KEY=<your-env-secret-key> npx ts-node apps/api/src/app/agents/e2e/mock-agent-handler.ts
*
* Setup:
* 1. Start Novu API: pnpm start:api:dev
* 2. Set environment bridge URL to http://localhost:4111/api/novu (dashboard or direct DB update)
* 3. Create an agent + link a Slack integration via the API
* 4. Point Slack event subscriptions to your Novu webhook URL (ngrok/tunnel)
* 5. @mention the bot in Slack — watch the round-trip in the logs
*/

import { agent, Client, serve } from '@novu/framework/express';
import express from 'express';

const NOVU_SECRET_KEY = process.env.NOVU_SECRET_KEY;
const PORT = Number(process.env.MOCK_PORT) || 4111;

if (!NOVU_SECRET_KEY) {
console.error('NOVU_SECRET_KEY is required. Set it to your environment secret key.');
process.exit(1);
}

const echoBot = agent('novu-agent', {
onMessage: async (ctx) => {
console.log('\n─────────────────────────────────────────');
console.log(`[${ctx.event}] from ${ctx.subscriber?.firstName ?? 'unknown'} on ${ctx.platform}`);
console.log(`Message: ${ctx.message?.text ?? '(none)'}`);
console.log(`Conversation: ${ctx.conversation.identifier} (${ctx.conversation.status})`);
console.log(`History: ${ctx.history.length} entries`);
console.log('─────────────────────────────────────────');

const userText = ctx.message?.text ?? '';
const turnCount = (ctx.conversation.metadata?.turnCount as number) ?? 0;

ctx.metadata.set('turnCount', turnCount + 1);

if (userText.toLowerCase().includes('done')) {
ctx.resolve(`Conversation resolved after ${turnCount + 1} turns`);
await ctx.reply('Thanks for chatting! Resolving this conversation.');

return;
}

await ctx.reply(`Echo: ${userText}`);
},

onResolve: async (ctx) => {
console.log(`\n[onResolve] Conversation ${ctx.conversation.identifier} closed.`);
ctx.metadata.set('resolvedAt', new Date().toISOString());
},
});

const app = express();
app.use(express.json());

app.use(
'/api/novu',
serve({
agents: [echoBot],
client: new Client({
secretKey: NOVU_SECRET_KEY,
strictAuthentication: false,
}),
})
);

app.listen(PORT, () => {
console.log(`\nAgent Handler (using @novu/framework) running on http://localhost:${PORT}/api/novu`);
console.log('\nWaiting for bridge calls...\n');
});
2 changes: 1 addition & 1 deletion apps/api/src/app/agents/services/chat-sdk.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ export class ChatSdkService implements OnModuleDestroy {
chat.onNewMention(async (thread: Thread, message: Message) => {
try {
await thread.subscribe();
await this.inboundHandler.handle(agentId, config, thread, message, AgentEventEnum.ON_START);
await this.inboundHandler.handle(agentId, config, thread, message, AgentEventEnum.ON_MESSAGE);
} catch (err) {
this.logger.error(err, `[agent:${agentId}] Error handling new mention`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const integrationSchema = new Schema<IntegrationDBModel>(
senderId: Schema.Types.String,
servicePlanId: Schema.Types.String,
tenantId: Schema.Types.String,
signingSecret: Schema.Types.String,
AppIOBaseUrl: Schema.Types.String,
AppIOSubscriptionId: Schema.Types.String,
AppIOBearerToken: Schema.Types.String,
Expand Down
30 changes: 21 additions & 9 deletions packages/framework/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
StepNotFoundError,
WorkflowNotFoundError,
} from './errors';
import type { Agent } from './resources/agent';
import { mockSchema } from './jsonSchemaFaker';
import { prettyPrintDiscovery } from './resources/workflow/pretty-print-discovery';
import type {
Expand Down Expand Up @@ -52,6 +53,7 @@ function isRuntimeInDevelopment() {
export class Client {
private discoveredWorkflows = new Map<string, DiscoverWorkflowOutput>();
private discoverWorkflowPromises = new Map<string, Promise<void>>();
private registeredAgents = new Map<string, Agent>();

private templateEngine: Liquid;

Expand Down Expand Up @@ -128,6 +130,16 @@ export class Client {
}
}

public addAgents(agents: Array<Agent>): void {
for (const a of agents) {
this.registeredAgents.set(a.id, a);
}
}

public getAgent(agentId: string): Agent | undefined {
return this.registeredAgents.get(agentId);
}

private async addWorkflow(workflow: Workflow): Promise<void> {
try {
const definition = await workflow.discover();
Expand Down Expand Up @@ -402,12 +414,12 @@ export class Client {
}

public async executeWorkflow(event: Event): Promise<ExecuteOutput> {
const actionMessages = {
const actionMessages: Record<string, string> = {
[PostActionEnum.EXECUTE]: 'Executing',
[PostActionEnum.PREVIEW]: 'Previewing',
} as const;
};

const actionMessage = actionMessages[event.action];
const actionMessage = actionMessages[event.action] || event.action;

const actionMessageFormatted = `${actionMessage} workflowId:`;
this.log(`\n${log.bold(log.underline(actionMessageFormatted))} '${event.workflowId}'`);
Expand Down Expand Up @@ -495,11 +507,11 @@ export class Client {
const elapsedTimeInMilliseconds = elapsedSeconds * 1_000 + elapsedNanoseconds / 1_000_000;

const emoji = executionError ? EMOJI.ERROR : EMOJI.SUCCESS;
const resultMessages = {
const resultMessages: Record<string, string> = {
[PostActionEnum.EXECUTE]: 'Executed',
[PostActionEnum.PREVIEW]: 'Previewed',
} as const;
const resultMessage = resultMessages[event.action];
};
const resultMessage = resultMessages[event.action] || event.action;

this.log(`${emoji} ${resultMessage} workflowId: \`${event.workflowId}\``);

Expand Down Expand Up @@ -547,11 +559,11 @@ export class Client {
if (!this.verbose) return;

const successPrefix = error ? EMOJI.ERROR : EMOJI.SUCCESS;
const actionMessages = {
const actionMessages: Record<string, string> = {
[PostActionEnum.EXECUTE]: 'Executed',
[PostActionEnum.PREVIEW]: 'Previewed',
} as const;
const actionMessage = actionMessages[event.action];
};
const actionMessage = actionMessages[event.action] || event.action;
const message = error ? 'Failed to execute' : actionMessage;
const executionLog = error ? log.error : log.success;
const logMessage = `${successPrefix} ${message} workflowId: '${event.workflowId}`;
Expand Down
1 change: 1 addition & 0 deletions packages/framework/src/constants/action.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export enum PostActionEnum {
TRIGGER = 'trigger',
EXECUTE = 'execute',
PREVIEW = 'preview',
AGENT_EVENT = 'agent-event',
}

export enum GetActionEnum {
Expand Down
2 changes: 2 additions & 0 deletions packages/framework/src/constants/http-query.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ export enum HttpQueryKeysEnum {
STEP_ID = 'stepId',
ACTION = 'action',
SOURCE = 'source',
AGENT_ID = 'agentId',
EVENT = 'event',
}
60 changes: 53 additions & 7 deletions packages/framework/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ import {
SigningKeyNotFoundError,
} from './errors';
import { isPlatformError } from './errors/guard.errors';
import { AgentContextImpl, AgentEventEnum } from './resources/agent';
import type { Agent, AgentBridgeRequest } from './resources/agent';
import type { Awaitable, EventTriggerParams, Workflow } from './types';
import { createHmacSubtle, initApiClient } from './utils';

export type ServeHandlerOptions = {
export interface ServeHandlerOptions {
client?: Client;
workflows: Array<Workflow>;
};
workflows?: Array<Workflow>;
agents?: Array<Agent>;
}

export type INovuRequestHandlerOptions<Input extends any[] = any[], Output = any> = ServeHandlerOptions & {
frameworkName: string;
client?: Client;
workflows: Array<Workflow>;
workflows?: Array<Workflow>;
agents?: Array<Agent>;
handler: Handler<Input, Output>;
};

Expand All @@ -45,6 +49,7 @@ type HandlerResponse<Output = any> = {
queryString?: (key: string, url: URL) => Awaitable<string | null | undefined>;
url: () => Awaitable<URL>;
transformResponse: (res: IActionResponse<string>) => Output;
waitUntil?: (promise: Promise<unknown>) => void;
};

export type IActionResponse<TBody extends string = string> = {
Expand All @@ -62,14 +67,17 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
private readonly hmacEnabled: boolean;
private readonly http;
private readonly workflows: Array<Workflow>;
private readonly agents: Array<Agent>;

constructor(options: INovuRequestHandlerOptions<Input, Output>) {
this.handler = options.handler;
this.client = options.client ? options.client : new Client();
this.workflows = options.workflows;
this.workflows = options.workflows || [];
this.agents = options.agents || [];
this.http = initApiClient(this.client.secretKey, this.client.apiUrl);
this.frameworkName = options.frameworkName;
this.hmacEnabled = this.client.strictAuthentication;
this.client.addAgents(this.agents);
}

public createHandler(): (...args: Input) => Promise<Output> {
Expand Down Expand Up @@ -129,6 +137,8 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
const action = url.searchParams.get(HttpQueryKeysEnum.ACTION) || GetActionEnum.HEALTH_CHECK;
const workflowId = url.searchParams.get(HttpQueryKeysEnum.WORKFLOW_ID) || '';
const stepId = url.searchParams.get(HttpQueryKeysEnum.STEP_ID) || '';
const agentId = url.searchParams.get(HttpQueryKeysEnum.AGENT_ID) || '';
const agentEvent = url.searchParams.get(HttpQueryKeysEnum.EVENT) || '';
const signatureHeader = (await actions.headers(HttpHeaderKeysEnum.NOVU_SIGNATURE)) || '';

let body: Record<string, unknown> = {};
Expand All @@ -145,7 +155,7 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
await this.validateHmac(body, signatureHeader);
}

const postActionMap = this.getPostActionMap(body, workflowId, stepId, action);
const postActionMap = this.getPostActionMap(body, workflowId, stepId, action, agentId, agentEvent, actions.waitUntil);
const getActionMap = this.getGetActionMap(workflowId, stepId);

if (method === HttpMethodEnum.POST) {
Expand All @@ -171,7 +181,10 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
body: any,
workflowId: string,
stepId: string,
action: string
action: string,
agentId: string,
agentEvent: string,
waitUntil?: (promise: Promise<unknown>) => void
): Record<PostActionEnum, () => Promise<IActionResponse>> {
return {
[PostActionEnum.TRIGGER]: this.triggerAction({ workflowId, ...body }),
Expand All @@ -195,6 +208,25 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {

return this.createResponse(HttpStatusEnum.OK, result);
},
[PostActionEnum.AGENT_EVENT]: async () => {
const registeredAgent = this.client.getAgent(agentId);

if (!registeredAgent) {
return this.createResponse(HttpStatusEnum.NOT_FOUND, { error: `Agent '${agentId}' not registered` });
}

const ctx = new AgentContextImpl(body as AgentBridgeRequest, this.client.secretKey);

const handlerPromise = this.runAgentHandler(registeredAgent, agentEvent, ctx).catch((err) => {
console.error(`[agent:${agentId}] Handler error:`, err);
});

if (waitUntil) {
waitUntil(handlerPromise);
}

return this.createResponse(HttpStatusEnum.OK, { status: 'ack' });
},
};
}

Expand Down Expand Up @@ -264,6 +296,20 @@ export class NovuRequestHandler<Input extends any[] = any[], Output = any> {
}
}

private async runAgentHandler(registeredAgent: Agent, event: string, ctx: AgentContextImpl): Promise<void> {
if (event === AgentEventEnum.ON_RESOLVE) {
if (registeredAgent.handlers.onResolve) {
await registeredAgent.handlers.onResolve(ctx);
}
} else if (event === AgentEventEnum.ON_MESSAGE) {
await registeredAgent.handlers.onMessage(ctx);
} else {
throw new InvalidActionError(event, AgentEventEnum);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

await ctx.flush();
}

private handleError(error: unknown): IActionResponse {
if (isFrameworkError(error)) {
if (error.statusCode >= 500) {
Expand Down
7 changes: 6 additions & 1 deletion packages/framework/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
export { Client } from './client';
export { CronExpression } from './constants';
export { NovuRequestHandler, type ServeHandlerOptions } from './handler';
export { workflow } from './resources';
export { agent, workflow } from './resources';
export type {
Agent,
AgentContext,
AgentHandlers,
} from './resources';
export type {
AnyStepResolver,
ChatStepResolver,
Expand Down
Loading
Loading