diff --git a/apps/api/src/app/agents/dtos/agent-event.enum.ts b/apps/api/src/app/agents/dtos/agent-event.enum.ts index 7fac2ce814b..635d6bce6aa 100644 --- a/apps/api/src/app/agents/dtos/agent-event.enum.ts +++ b/apps/api/src/app/agents/dtos/agent-event.enum.ts @@ -1,6 +1,4 @@ export enum AgentEventEnum { - ON_START = 'onStart', ON_MESSAGE = 'onMessage', - ON_ACTION = 'onAction', ON_RESOLVE = 'onResolve', } diff --git a/apps/api/src/app/agents/e2e/agent-webhook.e2e.ts b/apps/api/src/app/agents/e2e/agent-webhook.e2e.ts index fc1db6039c2..106d56292ff 100644 --- a/apps/api/src/app/agents/e2e/agent-webhook.e2e.ts +++ b/apps/api/src/app/agents/e2e/agent-webhook.e2e.ts @@ -67,7 +67,7 @@ describe('Agent Webhook - inbound flow #novu-v2', () => { }); }); - async function invokeInbound(threadId: string, message: ReturnType, event = AgentEventEnum.ON_START) { + async function invokeInbound(threadId: string, message: ReturnType, event = AgentEventEnum.ON_MESSAGE) { const config = await credentialService.resolve(ctx.agentId, ctx.integrationIdentifier); const thread = mockThread(threadId); await inboundHandler.handle(ctx.agentId, config, thread as any, message as any, event); @@ -220,7 +220,7 @@ describe('Agent Webhook - inbound flow #novu-v2', () => { expect(bridgeCalls.length).to.equal(1); const call = bridgeCalls[0]; - expect(call.event).to.equal(AgentEventEnum.ON_START); + expect(call.event).to.equal(AgentEventEnum.ON_MESSAGE); expect(call.config.agentIdentifier).to.equal(ctx.agentIdentifier); expect(call.config.integrationIdentifier).to.equal(ctx.integrationIdentifier); expect(call.config.platform).to.equal('slack'); diff --git a/apps/api/src/app/agents/e2e/helpers/agent-test-setup.ts b/apps/api/src/app/agents/e2e/helpers/agent-test-setup.ts index 022c0bd17e5..d4699c3c9e7 100644 --- a/apps/api/src/app/agents/e2e/helpers/agent-test-setup.ts +++ b/apps/api/src/app/agents/e2e/helpers/agent-test-setup.ts @@ -52,7 +52,7 @@ export async function setupAgentTestContext(): Promise { _organizationId: session.organization._id, providerId: ChatProviderIdEnum.Slack, channel: ChannelTypeEnum.CHAT, - credentials: encryptCredentials({ apiKey: SIGNING_SECRET }), + credentials: encryptCredentials({ signingSecret: SIGNING_SECRET }), active: true, name: 'Slack Agent E2E', identifier: `slack-agent-e2e-${Date.now()}`, diff --git a/apps/api/src/app/agents/e2e/mock-agent-handler.ts b/apps/api/src/app/agents/e2e/mock-agent-handler.ts new file mode 100644 index 00000000000..368182dc54e --- /dev/null +++ b/apps/api/src/app/agents/e2e/mock-agent-handler.ts @@ -0,0 +1,76 @@ +/** + * Agent Handler — E2E test utility using @novu/framework + * + * This is a real serve() endpoint that uses the agent SDK to handle bridge calls. + * Run alongside the Novu API to test the full agent round-trip with Slack. + * + * Usage: + * NOVU_SECRET_KEY= npx ts-node apps/api/src/app/agents/e2e/mock-agent-handler.ts + * + * Setup: + * 1. Start Novu API: pnpm start:api:dev + * 2. Set environment bridge URL to http://localhost:4111/api/novu (dashboard or direct DB update) + * 3. Create an agent + link a Slack integration via the API + * 4. Point Slack event subscriptions to your Novu webhook URL (ngrok/tunnel) + * 5. @mention the bot in Slack — watch the round-trip in the logs + */ + +import { agent, Client, serve } from '@novu/framework/express'; +import express from 'express'; + +const NOVU_SECRET_KEY = process.env.NOVU_SECRET_KEY; +const PORT = Number(process.env.MOCK_PORT) || 4111; + +if (!NOVU_SECRET_KEY) { + console.error('NOVU_SECRET_KEY is required. Set it to your environment secret key.'); + process.exit(1); +} + +const echoBot = agent('novu-agent', { + onMessage: async (ctx) => { + console.log('\n─────────────────────────────────────────'); + console.log(`[${ctx.event}] from ${ctx.subscriber?.firstName ?? 'unknown'} on ${ctx.platform}`); + console.log(`Message: ${ctx.message?.text ?? '(none)'}`); + console.log(`Conversation: ${ctx.conversation.identifier} (${ctx.conversation.status})`); + console.log(`History: ${ctx.history.length} entries`); + console.log('─────────────────────────────────────────'); + + const userText = ctx.message?.text ?? ''; + const turnCount = (ctx.conversation.metadata?.turnCount as number) ?? 0; + + ctx.metadata.set('turnCount', turnCount + 1); + + if (userText.toLowerCase().includes('done')) { + ctx.resolve(`Conversation resolved after ${turnCount + 1} turns`); + await ctx.reply('Thanks for chatting! Resolving this conversation.'); + + return; + } + + await ctx.reply(`Echo: ${userText}`); + }, + + onResolve: async (ctx) => { + console.log(`\n[onResolve] Conversation ${ctx.conversation.identifier} closed.`); + ctx.metadata.set('resolvedAt', new Date().toISOString()); + }, +}); + +const app = express(); +app.use(express.json()); + +app.use( + '/api/novu', + serve({ + agents: [echoBot], + client: new Client({ + secretKey: NOVU_SECRET_KEY, + strictAuthentication: false, + }), + }) +); + +app.listen(PORT, () => { + console.log(`\nAgent Handler (using @novu/framework) running on http://localhost:${PORT}/api/novu`); + console.log('\nWaiting for bridge calls...\n'); +}); diff --git a/apps/api/src/app/agents/services/chat-sdk.service.ts b/apps/api/src/app/agents/services/chat-sdk.service.ts index 534684c8a04..e40159c31ec 100644 --- a/apps/api/src/app/agents/services/chat-sdk.service.ts +++ b/apps/api/src/app/agents/services/chat-sdk.service.ts @@ -224,7 +224,7 @@ export class ChatSdkService implements OnModuleDestroy { chat.onNewMention(async (thread: Thread, message: Message) => { try { await thread.subscribe(); - await this.inboundHandler.handle(agentId, config, thread, message, AgentEventEnum.ON_START); + await this.inboundHandler.handle(agentId, config, thread, message, AgentEventEnum.ON_MESSAGE); } catch (err) { this.logger.error(err, `[agent:${agentId}] Error handling new mention`); } diff --git a/libs/dal/src/repositories/integration/integration.schema.ts b/libs/dal/src/repositories/integration/integration.schema.ts index 43541996a20..7113c64a25b 100644 --- a/libs/dal/src/repositories/integration/integration.schema.ts +++ b/libs/dal/src/repositories/integration/integration.schema.ts @@ -66,6 +66,7 @@ const integrationSchema = new Schema( senderId: Schema.Types.String, servicePlanId: Schema.Types.String, tenantId: Schema.Types.String, + signingSecret: Schema.Types.String, AppIOBaseUrl: Schema.Types.String, AppIOSubscriptionId: Schema.Types.String, AppIOBearerToken: Schema.Types.String, diff --git a/packages/framework/src/client.ts b/packages/framework/src/client.ts index f7581ac5a87..551e80f592e 100644 --- a/packages/framework/src/client.ts +++ b/packages/framework/src/client.ts @@ -18,6 +18,7 @@ import { StepNotFoundError, WorkflowNotFoundError, } from './errors'; +import type { Agent } from './resources/agent'; import { mockSchema } from './jsonSchemaFaker'; import { prettyPrintDiscovery } from './resources/workflow/pretty-print-discovery'; import type { @@ -52,6 +53,7 @@ function isRuntimeInDevelopment() { export class Client { private discoveredWorkflows = new Map(); private discoverWorkflowPromises = new Map>(); + private registeredAgents = new Map(); private templateEngine: Liquid; @@ -128,6 +130,16 @@ export class Client { } } + public addAgents(agents: Array): void { + for (const a of agents) { + this.registeredAgents.set(a.id, a); + } + } + + public getAgent(agentId: string): Agent | undefined { + return this.registeredAgents.get(agentId); + } + private async addWorkflow(workflow: Workflow): Promise { try { const definition = await workflow.discover(); @@ -402,12 +414,12 @@ export class Client { } public async executeWorkflow(event: Event): Promise { - const actionMessages = { + const actionMessages: Record = { [PostActionEnum.EXECUTE]: 'Executing', [PostActionEnum.PREVIEW]: 'Previewing', - } as const; + }; - const actionMessage = actionMessages[event.action]; + const actionMessage = actionMessages[event.action] || event.action; const actionMessageFormatted = `${actionMessage} workflowId:`; this.log(`\n${log.bold(log.underline(actionMessageFormatted))} '${event.workflowId}'`); @@ -495,11 +507,11 @@ export class Client { const elapsedTimeInMilliseconds = elapsedSeconds * 1_000 + elapsedNanoseconds / 1_000_000; const emoji = executionError ? EMOJI.ERROR : EMOJI.SUCCESS; - const resultMessages = { + const resultMessages: Record = { [PostActionEnum.EXECUTE]: 'Executed', [PostActionEnum.PREVIEW]: 'Previewed', - } as const; - const resultMessage = resultMessages[event.action]; + }; + const resultMessage = resultMessages[event.action] || event.action; this.log(`${emoji} ${resultMessage} workflowId: \`${event.workflowId}\``); @@ -547,11 +559,11 @@ export class Client { if (!this.verbose) return; const successPrefix = error ? EMOJI.ERROR : EMOJI.SUCCESS; - const actionMessages = { + const actionMessages: Record = { [PostActionEnum.EXECUTE]: 'Executed', [PostActionEnum.PREVIEW]: 'Previewed', - } as const; - const actionMessage = actionMessages[event.action]; + }; + const actionMessage = actionMessages[event.action] || event.action; const message = error ? 'Failed to execute' : actionMessage; const executionLog = error ? log.error : log.success; const logMessage = `${successPrefix} ${message} workflowId: '${event.workflowId}`; diff --git a/packages/framework/src/constants/action.constants.ts b/packages/framework/src/constants/action.constants.ts index df503ad6ce6..d95ccb6bf81 100644 --- a/packages/framework/src/constants/action.constants.ts +++ b/packages/framework/src/constants/action.constants.ts @@ -2,6 +2,7 @@ export enum PostActionEnum { TRIGGER = 'trigger', EXECUTE = 'execute', PREVIEW = 'preview', + AGENT_EVENT = 'agent-event', } export enum GetActionEnum { diff --git a/packages/framework/src/constants/http-query.constants.ts b/packages/framework/src/constants/http-query.constants.ts index 359f0c11302..91933d821df 100644 --- a/packages/framework/src/constants/http-query.constants.ts +++ b/packages/framework/src/constants/http-query.constants.ts @@ -3,4 +3,6 @@ export enum HttpQueryKeysEnum { STEP_ID = 'stepId', ACTION = 'action', SOURCE = 'source', + AGENT_ID = 'agentId', + EVENT = 'event', } diff --git a/packages/framework/src/handler.ts b/packages/framework/src/handler.ts index 8af89dd8092..b217999bcb7 100644 --- a/packages/framework/src/handler.ts +++ b/packages/framework/src/handler.ts @@ -21,18 +21,22 @@ import { SigningKeyNotFoundError, } from './errors'; import { isPlatformError } from './errors/guard.errors'; +import { AgentContextImpl, AgentEventEnum } from './resources/agent'; +import type { Agent, AgentBridgeRequest } from './resources/agent'; import type { Awaitable, EventTriggerParams, Workflow } from './types'; import { createHmacSubtle, initApiClient } from './utils'; -export type ServeHandlerOptions = { +export interface ServeHandlerOptions { client?: Client; - workflows: Array; -}; + workflows?: Array; + agents?: Array; +} export type INovuRequestHandlerOptions = ServeHandlerOptions & { frameworkName: string; client?: Client; - workflows: Array; + workflows?: Array; + agents?: Array; handler: Handler; }; @@ -45,6 +49,7 @@ type HandlerResponse = { queryString?: (key: string, url: URL) => Awaitable; url: () => Awaitable; transformResponse: (res: IActionResponse) => Output; + waitUntil?: (promise: Promise) => void; }; export type IActionResponse = { @@ -62,14 +67,17 @@ export class NovuRequestHandler { private readonly hmacEnabled: boolean; private readonly http; private readonly workflows: Array; + private readonly agents: Array; constructor(options: INovuRequestHandlerOptions) { this.handler = options.handler; this.client = options.client ? options.client : new Client(); - this.workflows = options.workflows; + this.workflows = options.workflows || []; + this.agents = options.agents || []; this.http = initApiClient(this.client.secretKey, this.client.apiUrl); this.frameworkName = options.frameworkName; this.hmacEnabled = this.client.strictAuthentication; + this.client.addAgents(this.agents); } public createHandler(): (...args: Input) => Promise { @@ -129,6 +137,8 @@ export class NovuRequestHandler { const action = url.searchParams.get(HttpQueryKeysEnum.ACTION) || GetActionEnum.HEALTH_CHECK; const workflowId = url.searchParams.get(HttpQueryKeysEnum.WORKFLOW_ID) || ''; const stepId = url.searchParams.get(HttpQueryKeysEnum.STEP_ID) || ''; + const agentId = url.searchParams.get(HttpQueryKeysEnum.AGENT_ID) || ''; + const agentEvent = url.searchParams.get(HttpQueryKeysEnum.EVENT) || ''; const signatureHeader = (await actions.headers(HttpHeaderKeysEnum.NOVU_SIGNATURE)) || ''; let body: Record = {}; @@ -145,7 +155,7 @@ export class NovuRequestHandler { await this.validateHmac(body, signatureHeader); } - const postActionMap = this.getPostActionMap(body, workflowId, stepId, action); + const postActionMap = this.getPostActionMap(body, workflowId, stepId, action, agentId, agentEvent, actions.waitUntil); const getActionMap = this.getGetActionMap(workflowId, stepId); if (method === HttpMethodEnum.POST) { @@ -171,7 +181,10 @@ export class NovuRequestHandler { body: any, workflowId: string, stepId: string, - action: string + action: string, + agentId: string, + agentEvent: string, + waitUntil?: (promise: Promise) => void ): Record Promise> { return { [PostActionEnum.TRIGGER]: this.triggerAction({ workflowId, ...body }), @@ -195,6 +208,25 @@ export class NovuRequestHandler { return this.createResponse(HttpStatusEnum.OK, result); }, + [PostActionEnum.AGENT_EVENT]: async () => { + const registeredAgent = this.client.getAgent(agentId); + + if (!registeredAgent) { + return this.createResponse(HttpStatusEnum.NOT_FOUND, { error: `Agent '${agentId}' not registered` }); + } + + const ctx = new AgentContextImpl(body as AgentBridgeRequest, this.client.secretKey); + + const handlerPromise = this.runAgentHandler(registeredAgent, agentEvent, ctx).catch((err) => { + console.error(`[agent:${agentId}] Handler error:`, err); + }); + + if (waitUntil) { + waitUntil(handlerPromise); + } + + return this.createResponse(HttpStatusEnum.OK, { status: 'ack' }); + }, }; } @@ -264,6 +296,20 @@ export class NovuRequestHandler { } } + private async runAgentHandler(registeredAgent: Agent, event: string, ctx: AgentContextImpl): Promise { + if (event === AgentEventEnum.ON_RESOLVE) { + if (registeredAgent.handlers.onResolve) { + await registeredAgent.handlers.onResolve(ctx); + } + } else if (event === AgentEventEnum.ON_MESSAGE) { + await registeredAgent.handlers.onMessage(ctx); + } else { + throw new InvalidActionError(event, AgentEventEnum); + } + + await ctx.flush(); + } + private handleError(error: unknown): IActionResponse { if (isFrameworkError(error)) { if (error.statusCode >= 500) { diff --git a/packages/framework/src/index.ts b/packages/framework/src/index.ts index ad019bb50a6..0a583ac5d8f 100644 --- a/packages/framework/src/index.ts +++ b/packages/framework/src/index.ts @@ -1,7 +1,12 @@ export { Client } from './client'; export { CronExpression } from './constants'; export { NovuRequestHandler, type ServeHandlerOptions } from './handler'; -export { workflow } from './resources'; +export { agent, workflow } from './resources'; +export type { + Agent, + AgentContext, + AgentHandlers, +} from './resources'; export type { AnyStepResolver, ChatStepResolver, diff --git a/packages/framework/src/resources/agent/agent.context.ts b/packages/framework/src/resources/agent/agent.context.ts new file mode 100644 index 00000000000..25ad22b6aa1 --- /dev/null +++ b/packages/framework/src/resources/agent/agent.context.ts @@ -0,0 +1,132 @@ +import type { + AgentBridgeRequest, + AgentContext, + AgentConversation, + AgentHistoryEntry, + AgentMessage, + AgentPlatformContext, + AgentReplyPayload, + AgentSubscriber, + Signal, +} from './agent.types'; + +export class AgentContextImpl implements AgentContext { + readonly event: string; + readonly message: AgentMessage | null; + readonly conversation: AgentConversation; + readonly subscriber: AgentSubscriber | null; + readonly history: AgentHistoryEntry[]; + readonly platform: string; + readonly platformContext: AgentPlatformContext; + + readonly metadata: { set: (key: string, value: unknown) => void }; + + private _signals: Signal[] = []; + private _resolveSignal: { summary?: string } | null = null; + private readonly _replyUrl: string; + private readonly _conversationId: string; + private readonly _integrationIdentifier: string; + private readonly _secretKey: string; + + constructor(request: AgentBridgeRequest, secretKey: string) { + this.event = request.event; + this.message = request.message; + this.conversation = request.conversation; + this.subscriber = request.subscriber; + this.history = request.history; + this.platform = request.platform; + this.platformContext = request.platformContext; + + this._replyUrl = request.replyUrl; + this._conversationId = request.conversationId; + this._integrationIdentifier = request.integrationIdentifier; + this._secretKey = secretKey; + + this.metadata = { + set: (key: string, value: unknown) => { + this._signals.push({ type: 'metadata', key, value }); + }, + }; + } + + async reply(text: string): Promise { + const body: AgentReplyPayload = { + conversationId: this._conversationId, + integrationIdentifier: this._integrationIdentifier, + reply: { text }, + }; + + if (this._signals.length) { + body.signals = this._signals; + this._signals = []; + } + + if (this._resolveSignal) { + body.resolve = this._resolveSignal; + this._resolveSignal = null; + } + + await this._post(body); + } + + async update(text: string): Promise { + const body: AgentReplyPayload = { + conversationId: this._conversationId, + integrationIdentifier: this._integrationIdentifier, + update: { text }, + }; + + await this._post(body); + } + + resolve(summary?: string): void { + this._resolveSignal = { summary }; + } + + trigger(workflowId: string, opts?: { to?: string; payload?: Record }): void { + this._signals.push({ type: 'trigger', workflowId, ...opts }); + } + + /** + * Flush any remaining signals that weren't sent with reply(). + * Called internally after onResolve returns. + */ + async flush(): Promise { + if (!this._signals.length && !this._resolveSignal) { + return; + } + + const body: AgentReplyPayload = { + conversationId: this._conversationId, + integrationIdentifier: this._integrationIdentifier, + }; + + if (this._signals.length) { + body.signals = this._signals; + this._signals = []; + } + + if (this._resolveSignal) { + body.resolve = this._resolveSignal; + this._resolveSignal = null; + } + + await this._post(body); + } + + private async _post(body: AgentReplyPayload): Promise { + const response = await fetch(this._replyUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `ApiKey ${this._secretKey}`, + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + const text = await response.text().catch(() => ''); + throw new Error(`Agent reply failed (${response.status}): ${text}`); + } + } +} diff --git a/packages/framework/src/resources/agent/agent.resource.ts b/packages/framework/src/resources/agent/agent.resource.ts new file mode 100644 index 00000000000..788c26cdc66 --- /dev/null +++ b/packages/framework/src/resources/agent/agent.resource.ts @@ -0,0 +1,19 @@ +import type { Agent, AgentHandlers } from './agent.types'; + +/** + * Define a new conversational agent. + * + * @param agentId - Unique identifier matching the agent entity created in Novu (e.g. 'wine-bot') + * @param handlers - Handler functions for agent events + */ +export function agent(agentId: string, handlers: AgentHandlers): Agent { + if (!agentId) { + throw new Error('agent() requires a non-empty agentId'); + } + + if (!handlers?.onMessage) { + throw new Error(`agent('${agentId}') requires an onMessage handler`); + } + + return { id: agentId, handlers }; +} diff --git a/packages/framework/src/resources/agent/agent.test.ts b/packages/framework/src/resources/agent/agent.test.ts new file mode 100644 index 00000000000..62b3b684d3c --- /dev/null +++ b/packages/framework/src/resources/agent/agent.test.ts @@ -0,0 +1,323 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { Client } from '../../client'; +import { PostActionEnum } from '../../constants'; +import { NovuRequestHandler } from '../../handler'; +import { agent } from './agent.resource'; +import type { AgentBridgeRequest } from './agent.types'; + +function createMockBridgeRequest(overrides?: Partial): AgentBridgeRequest { + return { + version: 1, + timestamp: new Date().toISOString(), + deliveryId: 'del-123', + event: 'onMessage', + agentId: 'test-bot', + replyUrl: 'https://api.novu.co/v1/agents/test-bot/reply', + conversationId: 'conv-456', + integrationIdentifier: 'slack-main', + message: { + text: 'Hello bot!', + platformMessageId: 'msg-789', + author: { userId: 'u1', fullName: 'Alice', userName: 'alice', isBot: false }, + timestamp: new Date().toISOString(), + }, + conversation: { + identifier: 'conv-456', + status: 'active', + metadata: {}, + messageCount: 1, + createdAt: new Date().toISOString(), + lastActivityAt: new Date().toISOString(), + }, + subscriber: { + subscriberId: 'sub-001', + firstName: 'Alice', + email: 'alice@example.com', + }, + history: [], + platform: 'slack', + platformContext: { threadId: 't1', channelId: 'c1', isDM: false }, + ...overrides, + }; +} + +describe('agent()', () => { + it('should return an agent with id and handlers', () => { + const bot = agent('wine-bot', { onMessage: async () => {} }); + + expect(bot.id).toBe('wine-bot'); + expect(typeof bot.handlers.onMessage).toBe('function'); + }); + + it('should throw when agentId is empty', () => { + expect(() => agent('', { onMessage: async () => {} })).toThrow('non-empty agentId'); + }); + + it('should throw when onMessage is missing', () => { + expect(() => agent('wine-bot', {} as any)).toThrow('onMessage handler'); + }); +}); + +describe('agent dispatch via NovuRequestHandler', () => { + let client: Client; + let fetchMock: ReturnType; + const originalFetch = global.fetch; + + beforeEach(() => { + client = new Client({ secretKey: 'test-secret-key', strictAuthentication: false }); + fetchMock = vi.fn().mockResolvedValue({ ok: true, text: () => Promise.resolve('{}'), json: () => Promise.resolve({ status: 'ok' }) }); + global.fetch = fetchMock as typeof fetch; + }); + + afterEach(() => { + global.fetch = originalFetch; + }); + + it('should ACK immediately and run onMessage handler in background', async () => { + const onMessageSpy = vi.fn(async (ctx) => { + await ctx.reply('Echo: Hello bot!'); + }); + + const testBot = agent('test-bot', { onMessage: onMessageSpy }); + + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [testBot], + client, + handler: () => { + const body = createMockBridgeRequest(); + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=test-bot&event=onMessage`); + + return { + body: () => body, + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + const result = await handler.createHandler()(); + const parsed = JSON.parse(result.body); + + expect(result.status).toBe(200); + expect(parsed.status).toBe('ack'); + + await vi.waitFor(() => expect(onMessageSpy).toHaveBeenCalledTimes(1)); + + const replyCall = fetchMock.mock.calls.find( + (call: any[]) => call[0] === 'https://api.novu.co/v1/agents/test-bot/reply' + ); + expect(replyCall).toBeDefined(); + + const replyBody = JSON.parse(replyCall![1].body); + expect(replyBody.reply.text).toBe('Echo: Hello bot!'); + expect(replyBody.conversationId).toBe('conv-456'); + expect(replyBody.integrationIdentifier).toBe('slack-main'); + + const replyHeaders = replyCall![1].headers; + expect(replyHeaders.Authorization).toBe('ApiKey test-secret-key'); + }); + + it('should return 404 for unknown agent', async () => { + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [], + client, + handler: () => { + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=unknown-bot&event=onMessage`); + + return { + body: () => ({}), + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + const result = await handler.createHandler()(); + + expect(result.status).toBe(404); + expect(JSON.parse(result.body).error).toContain('unknown-bot'); + }); + + it('should batch metadata signals with reply', async () => { + const testBot = agent('test-bot', { + onMessage: async (ctx) => { + ctx.metadata.set('turnCount', 1); + ctx.metadata.set('language', 'en'); + await ctx.reply('Got it'); + }, + }); + + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [testBot], + client, + handler: () => { + const body = createMockBridgeRequest(); + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=test-bot&event=onMessage`); + + return { + body: () => body, + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + await handler.createHandler()(); + + await vi.waitFor(() => expect(fetchMock).toHaveBeenCalled()); + + const replyCall = fetchMock.mock.calls.find( + (call: any[]) => call[0] === 'https://api.novu.co/v1/agents/test-bot/reply' + ); + const replyBody = JSON.parse(replyCall![1].body); + + expect(replyBody.reply.text).toBe('Got it'); + expect(replyBody.signals).toHaveLength(2); + expect(replyBody.signals[0]).toEqual({ type: 'metadata', key: 'turnCount', value: 1 }); + expect(replyBody.signals[1]).toEqual({ type: 'metadata', key: 'language', value: 'en' }); + }); + + it('should send update independently without signals', async () => { + const testBot = agent('test-bot', { + onMessage: async (ctx) => { + ctx.metadata.set('step', 'thinking'); + await ctx.update('Thinking...'); + await ctx.reply('Done thinking'); + }, + }); + + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [testBot], + client, + handler: () => { + const body = createMockBridgeRequest(); + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=test-bot&event=onMessage`); + + return { + body: () => body, + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + await handler.createHandler()(); + + await vi.waitFor(() => expect(fetchMock.mock.calls.length).toBeGreaterThanOrEqual(2)); + + const replyCalls = fetchMock.mock.calls.filter( + (call: any[]) => call[0] === 'https://api.novu.co/v1/agents/test-bot/reply' + ); + + const parsedBodies = replyCalls.map(([, init]: any[]) => JSON.parse(init.body)); + const updateBody = parsedBodies.find((body: any) => body.update); + const replyBody = parsedBodies.find((body: any) => body.reply); + + expect(updateBody).toBeDefined(); + expect(updateBody.update.text).toBe('Thinking...'); + expect(updateBody.signals).toBeUndefined(); + + expect(replyBody).toBeDefined(); + expect(replyBody.reply.text).toBe('Done thinking'); + expect(replyBody.signals).toHaveLength(1); + }); + + it('should flush remaining signals after onResolve', async () => { + const testBot = agent('test-bot', { + onMessage: async () => {}, + onResolve: async (ctx) => { + ctx.metadata.set('archived', true); + ctx.trigger('post-resolve-workflow', { payload: { reason: 'done' } }); + }, + }); + + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [testBot], + client, + handler: () => { + const body = createMockBridgeRequest({ event: 'onResolve', message: null }); + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=test-bot&event=onResolve`); + + return { + body: () => body, + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + await handler.createHandler()(); + + await vi.waitFor(() => expect(fetchMock).toHaveBeenCalled()); + + const replyCall = fetchMock.mock.calls.find( + (call: any[]) => call[0] === 'https://api.novu.co/v1/agents/test-bot/reply' + ); + const flushBody = JSON.parse(replyCall![1].body); + + expect(flushBody.reply).toBeUndefined(); + expect(flushBody.signals).toHaveLength(2); + expect(flushBody.signals[0]).toEqual({ type: 'metadata', key: 'archived', value: true }); + expect(flushBody.signals[1]).toEqual({ + type: 'trigger', + workflowId: 'post-resolve-workflow', + payload: { reason: 'done' }, + }); + }); + + it('should provide read-only context properties from bridge payload', async () => { + let capturedCtx: any; + + const testBot = agent('test-bot', { + onMessage: async (ctx) => { + capturedCtx = ctx; + await ctx.reply('ok'); + }, + }); + + const handler = new NovuRequestHandler({ + frameworkName: 'test', + agents: [testBot], + client, + handler: () => { + const body = createMockBridgeRequest(); + const url = new URL(`http://localhost?action=${PostActionEnum.AGENT_EVENT}&agentId=test-bot&event=onMessage`); + + return { + body: () => body, + headers: () => null, + method: () => 'POST', + url: () => url, + transformResponse: (res: any) => res, + }; + }, + }); + + await handler.createHandler()(); + await vi.waitFor(() => expect(capturedCtx).toBeDefined()); + + expect(capturedCtx.event).toBe('onMessage'); + expect(capturedCtx.message?.text).toBe('Hello bot!'); + expect(capturedCtx.conversation.identifier).toBe('conv-456'); + expect(capturedCtx.subscriber?.subscriberId).toBe('sub-001'); + expect(capturedCtx.platform).toBe('slack'); + expect(capturedCtx.platformContext.threadId).toBe('t1'); + expect(capturedCtx.history).toEqual([]); + }); +}); diff --git a/packages/framework/src/resources/agent/agent.types.ts b/packages/framework/src/resources/agent/agent.types.ts new file mode 100644 index 00000000000..d3ed01cc52d --- /dev/null +++ b/packages/framework/src/resources/agent/agent.types.ts @@ -0,0 +1,119 @@ +export enum AgentEventEnum { + ON_MESSAGE = 'onMessage', + ON_RESOLVE = 'onResolve', +} + +// --------------------------------------------------------------------------- +// User-facing types (visible on ctx properties) +// --------------------------------------------------------------------------- + +export interface AgentMessageAuthor { + userId: string; + fullName: string; + userName: string; + isBot: boolean | 'unknown'; +} + +export interface AgentMessage { + text: string; + platformMessageId: string; + author: AgentMessageAuthor; + timestamp: string; +} + +export interface AgentConversation { + identifier: string; + status: string; + metadata: Record; + messageCount: number; + createdAt: string; + lastActivityAt: string; +} + +export interface AgentSubscriber { + subscriberId: string; + firstName?: string; + lastName?: string; + email?: string; + phone?: string; + avatar?: string; + locale?: string; + data?: Record; +} + +export interface AgentHistoryEntry { + role: string; + type: string; + content: string; + senderName?: string; + signalData?: { type: string; payload?: Record }; + createdAt: string; +} + +export interface AgentPlatformContext { + threadId: string; + channelId: string; + isDM: boolean; +} + +export interface AgentContext { + readonly event: string; + readonly message: AgentMessage | null; + readonly conversation: AgentConversation; + readonly subscriber: AgentSubscriber | null; + readonly history: AgentHistoryEntry[]; + readonly platform: string; + readonly platformContext: AgentPlatformContext; + + reply(text: string): Promise; + update(text: string): Promise; + resolve(summary?: string): void; + metadata: { + set(key: string, value: unknown): void; + }; + trigger(workflowId: string, opts?: { to?: string; payload?: Record }): void; +} + +export interface AgentHandlers { + onMessage: (ctx: AgentContext) => Promise; + onResolve?: (ctx: AgentContext) => Promise; +} + +export interface Agent { + id: string; + handlers: AgentHandlers; +} + +// --------------------------------------------------------------------------- +// Internal types (bridge protocol — not exposed to SDK consumers) +// --------------------------------------------------------------------------- + +export interface AgentBridgeRequest { + version: number; + timestamp: string; + deliveryId: string; + event: string; + agentId: string; + replyUrl: string; + conversationId: string; + integrationIdentifier: string; + message: AgentMessage | null; + conversation: AgentConversation; + subscriber: AgentSubscriber | null; + history: AgentHistoryEntry[]; + platform: string; + platformContext: AgentPlatformContext; +} + +export type MetadataSignal = { type: 'metadata'; key: string; value: unknown }; +export type TriggerSignal = { type: 'trigger'; workflowId: string; to?: string; payload?: Record }; +export type Signal = MetadataSignal | TriggerSignal; + +export interface AgentReplyPayload { + conversationId: string; + integrationIdentifier: string; + reply?: { text: string }; + update?: { text: string }; + resolve?: { summary?: string }; + signals?: Signal[]; +} diff --git a/packages/framework/src/resources/agent/index.ts b/packages/framework/src/resources/agent/index.ts new file mode 100644 index 00000000000..9f7a2439a91 --- /dev/null +++ b/packages/framework/src/resources/agent/index.ts @@ -0,0 +1,16 @@ +export { AgentContextImpl } from './agent.context'; +export { agent } from './agent.resource'; +export type { + Agent, + AgentBridgeRequest, + AgentContext, + AgentConversation, + AgentHandlers, + AgentHistoryEntry, + AgentMessage, + AgentMessageAuthor, + AgentPlatformContext, + AgentReplyPayload, + AgentSubscriber, +} from './agent.types'; +export { AgentEventEnum } from './agent.types'; diff --git a/packages/framework/src/resources/index.ts b/packages/framework/src/resources/index.ts index 6fba0ef33f9..4a8a0776c16 100644 --- a/packages/framework/src/resources/index.ts +++ b/packages/framework/src/resources/index.ts @@ -1 +1,2 @@ +export * from './agent'; export * from './workflow/workflow.resource';