refactor(api-service): clean up agent conversation architecture#10802
refactor(api-service): clean up agent conversation architecture#10802
Conversation
Break the circular dependency cycle (ChatSdkService ↔ AgentInboundHandler ↔ HandleAgentReply) by removing all forwardRef usage: - Expand AgentConversationService as the single authority for conversation state mutations (persist messages, edits, metadata, resolve) - Slim HandleAgentReply to orchestrate delivery + delegate persistence - Inline NoBridgeUrlError fallback in AgentInboundHandler instead of routing back through HandleAgentReply - Reuse ConversationChannel from DAL instead of redeclaring flat fields - Centralize channels[0] assumption into getPrimaryChannel() - Extract shared persistAgentActivity helper to reduce duplication Made-with: Cursor
✅ Deploy Preview for dashboard-v2-novu-staging canceled.
|
|
Hey there and thank you for opening this pull request! 👋 We require pull request titles to follow specific formatting rules and it looks like your proposed title needs to be adjusted. Your PR title is: Requirements:
Expected format: Details: PR title must end with 'fixes TICKET-ID' (e.g., 'fixes NOV-123') or include ticket ID in branch name |
📝 WalkthroughWalkthroughAdds AgentConversationService APIs for conversation/channel lookup, agent activity persistence (message/edit), metadata updates with size validation, and conversation resolution; refactors inbound/reply flows to delegate to the service and removes a forwardRef injection in ChatSdkService. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Inbound as AgentInboundHandler
participant Thread as ChatSdkService / Thread
participant ConvSvc as AgentConversationService
participant Repo as ConversationRepository / ActivityRepo
rect rgba(173,216,230,0.5)
Client->>Inbound: inbound event (agent reply / reaction)
Inbound->>Thread: thread.post(...) when no bridge URL
Thread-->>Inbound: postedMessage (optional id)
end
rect rgba(144,238,144,0.5)
Inbound->>ConvSvc: persistAgentMessage / persistAgentEdit(params)
ConvSvc->>Repo: get/find conversation, setFirstPlatformMessageId?
ConvSvc->>Repo: create ConversationActivity, touchActivity/touchPreview
ConvSvc-->>Inbound: persisted ConversationActivity
end
rect rgba(255,228,196,0.5)
Inbound->>ConvSvc: updateMetadata(params) / resolveConversation(params)
ConvSvc->>Repo: merge metadata / update conversation status
ConvSvc->>Repo: create metadata/resolve signal activity
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts (1)
43-88:⚠️ Potential issue | 🟠 MajorValidate agent ownership before signals and resolve mutations.
Reply/edit paths call
resolveValidatedAgentNameForDelivery(), but signals-only and resolve-only commands mutate the conversation without checking thatcommand.agentIdentifierownsconversation._agentId. Line 43 scopes by environment/org/conversation only, so a mismatched agent identifier can still update metadata or resolve the conversation.🔒 Proposed fix
const channel = this.getPrimaryChannel(conversation); + const agentName = await this.resolveValidatedAgentNameForDelivery(command, conversation); if (command.edit) { - const agentName = await this.resolveValidatedAgentNameForDelivery(command, conversation); - return this.deliverEdit(command, conversation, channel, command.edit, agentName); } @@ let replyInfo: SentMessageInfo | undefined; if (command.reply) { - const agentName = await this.resolveValidatedAgentNameForDelivery(command, conversation); - replyInfo = await this.deliverMessage( command, conversation,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts` around lines 43 - 88, After loading the conversation, ensure agent ownership is validated for signal-only or resolve-only flows: when command.edit is falsy and either command.signals?.length or command.resolve is set, call resolveValidatedAgentNameForDelivery(command, conversation) (the same validator used for reply/edit) and use its result (or let it throw) before invoking executeSignals or resolveConversation; if validation fails, throw/propagate the existing Forbidden/NotFound behavior so unauthorized agents cannot mutate the conversation.
🧹 Nitpick comments (1)
apps/api/src/app/agents/services/agent-conversation.service.ts (1)
206-247: Add the required blank line before newreturnstatements.The new wrapper methods return immediately without the project-required blank line.
🎨 Proposed style fix
async getConversation( conversationId: string, environmentId: string, organizationId: string ): Promise<ConversationEntity | null> { + return this.conversationRepository.findOne( { _id: conversationId, _environmentId: environmentId, _organizationId: organizationId }, '*' @@ async findByPlatformThread( environmentId: string, organizationId: string, platformThreadId: string ): Promise<ConversationEntity | null> { + return this.conversationRepository.findByPlatformThread(environmentId, organizationId, platformThreadId); } @@ async persistAgentMessage(params: PersistAgentActivityParams): Promise<ConversationActivityEntity> { + return this.persistAgentActivity(params, ConversationActivityTypeEnum.MESSAGE, 'activity'); } async persistAgentEdit(params: PersistAgentActivityParams): Promise<ConversationActivityEntity> { + return this.persistAgentActivity(params, ConversationActivityTypeEnum.EDIT, 'preview'); }As per coding guidelines, “Include a blank line before every
returnstatement”.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/api/src/app/agents/services/agent-conversation.service.ts` around lines 206 - 247, Each of the new wrapper methods are missing the project-required blank line before their immediate return statements; update getConversation, findByPlatformThread, setFirstPlatformMessageId, persistAgentMessage, and persistAgentEdit so there is a single blank line immediately preceding each return (or await return) to comply with the “blank line before every return” style rule.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/api/src/app/agents/services/agent-conversation.service.ts`:
- Around line 283-300: The updateMetadata method currently builds a full merged
object and calls conversationRepository.updateMetadata, which can cause lost
updates when concurrent requests start from the same currentMetadata snapshot;
modify the flow so the repository performs an atomic per-key patch or
optimistic-version check instead of blind replace: update the
signature/implementation of conversationRepository.updateMetadata (and any
backing store methods) to accept either a partial patch map (only the keys from
params.signals) and apply them atomically, or accept and verify a version/etag
from the caller before writing (fail on mismatch), and then call that new
repository method from updateMetadata (keep method name updateMetadata but pass
the signals or version info rather than the full merged payload).
- Around line 77-84: getPrimaryChannel currently indexes
conversation.channels[0] without guarding for conversation.channels being
undefined/null, causing a TypeError for malformed conversations; update
getPrimaryChannel to first verify conversation.channels exists and is an array
with length > 0 (e.g., check Array.isArray(conversation.channels) &&
conversation.channels.length > 0) and then return the first element, otherwise
throw the existing BadRequestException (`Conversation ${conversation._id} has no
channel`) so malformed input yields the intended error.
---
Outside diff comments:
In
`@apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts`:
- Around line 43-88: After loading the conversation, ensure agent ownership is
validated for signal-only or resolve-only flows: when command.edit is falsy and
either command.signals?.length or command.resolve is set, call
resolveValidatedAgentNameForDelivery(command, conversation) (the same validator
used for reply/edit) and use its result (or let it throw) before invoking
executeSignals or resolveConversation; if validation fails, throw/propagate the
existing Forbidden/NotFound behavior so unauthorized agents cannot mutate the
conversation.
---
Nitpick comments:
In `@apps/api/src/app/agents/services/agent-conversation.service.ts`:
- Around line 206-247: Each of the new wrapper methods are missing the
project-required blank line before their immediate return statements; update
getConversation, findByPlatformThread, setFirstPlatformMessageId,
persistAgentMessage, and persistAgentEdit so there is a single blank line
immediately preceding each return (or await return) to comply with the “blank
line before every return” style rule.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a51db14b-733a-4637-8b68-efb2b2354183
📒 Files selected for processing (4)
apps/api/src/app/agents/services/agent-conversation.service.tsapps/api/src/app/agents/services/agent-inbound-handler.service.tsapps/api/src/app/agents/services/chat-sdk.service.tsapps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts
… agent ownership on all paths - Add optional chaining on conversation.channels access in getPrimaryChannel to throw a clean BadRequestException instead of TypeError on malformed data - Hoist resolveValidatedAgentNameForDelivery to run unconditionally so signals-only and resolve-only requests also verify agent ownership Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
apps/api/src/app/agents/services/agent-conversation.service.ts (1)
283-300:⚠️ Potential issue | 🟠 MajorAvoid lost metadata updates from stale conversation snapshots.
mergedis built fromparams.currentMetadata, then written back as a full replacement. Concurrent signal updates that start from the same conversation snapshot can overwrite each other. Prefer an atomic per-key patch inConversationRepository.updateMetadataor an optimistic version check before replacing the full object.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/api/src/app/agents/services/agent-conversation.service.ts` around lines 283 - 300, The current updateMetadata method builds a full merged object from params.currentMetadata then replaces the conversation metadata via conversationRepository.updateMetadata, which can cause lost updates when concurrent requests use the same snapshot; modify the fix by changing the repository call to perform an atomic per-key patch (add/update only the keys from params.signals) or by adding an optimistic version check/compare-and-swap: keep using updateMetadata(params) but update ConversationRepository.updateMetadata to accept the signals array (or a version token) and perform a per-key merge in the DB transaction (or verify the conversation's metadata version matches params.currentVersion before replacing), and surface a conflict error so callers can retry; update references to merged and the call site in agent-conversation.service.updateMetadata to pass signals (or version) instead of blindly replacing the whole object.
🧹 Nitpick comments (1)
apps/api/src/app/agents/services/agent-conversation.service.ts (1)
206-214: Add the required blank line before changedreturnstatements.These changed wrappers return immediately after the signature/body opening. Please add a blank line before each
return. As per coding guidelines,Include a blank line before every return statement.Style-only adjustment
async getConversation( conversationId: string, environmentId: string, organizationId: string ): Promise<ConversationEntity | null> { + return this.conversationRepository.findOne( { _id: conversationId, _environmentId: environmentId, _organizationId: organizationId }, '*' ); } async findByPlatformThread( environmentId: string, organizationId: string, platformThreadId: string ): Promise<ConversationEntity | null> { + return this.conversationRepository.findByPlatformThread(environmentId, organizationId, platformThreadId); } async persistAgentMessage(params: PersistAgentActivityParams): Promise<ConversationActivityEntity> { + return this.persistAgentActivity(params, ConversationActivityTypeEnum.MESSAGE, 'activity'); } async persistAgentEdit(params: PersistAgentActivityParams): Promise<ConversationActivityEntity> { + return this.persistAgentActivity(params, ConversationActivityTypeEnum.EDIT, 'preview'); }Also applies to: 217-223, 241-247
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/api/src/app/agents/services/agent-conversation.service.ts` around lines 206 - 214, Add a blank line immediately before any early return statements in this file; specifically insert a blank line before the return inside getConversation (the return calling this.conversationRepository.findOne) and do the same for the other changed wrappers referenced (the return statements around lines 217-223 and 241-247). Locate the methods that call this.conversationRepository.findOne or otherwise return immediately after the signature/body opening and ensure there is one blank line separating the opening brace and the return statement in each function.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts`:
- Around line 52-56: The code unconditionally calls
getPrimaryChannel(conversation) and later enforces presence of
conversation.serializedThread even for non-reply flows (edit-only, resolve-only,
signals-only); restrict the serializedThread requirement and primary-channel
usage to the reply-delivery path only. Update execute() so you only call
getPrimaryChannel and validate conversation.serializedThread when handling
reply-delivery (e.g., before calling deliverReply/deliverEdit paths that require
platform thread), keep resolveValidatedAgentNameForDelivery and other non-reply
branches working without serializedThread, and adjust the guard logic around
deliverEdit, deliverReply, and any signals-only paths to avoid rejecting valid
non-reply commands. Ensure changes reference getPrimaryChannel,
resolveValidatedAgentNameForDelivery, deliverEdit, deliverReply, and the
serializedThread checks so reviewers can locate and verify the fix.
---
Duplicate comments:
In `@apps/api/src/app/agents/services/agent-conversation.service.ts`:
- Around line 283-300: The current updateMetadata method builds a full merged
object from params.currentMetadata then replaces the conversation metadata via
conversationRepository.updateMetadata, which can cause lost updates when
concurrent requests use the same snapshot; modify the fix by changing the
repository call to perform an atomic per-key patch (add/update only the keys
from params.signals) or by adding an optimistic version check/compare-and-swap:
keep using updateMetadata(params) but update
ConversationRepository.updateMetadata to accept the signals array (or a version
token) and perform a per-key merge in the DB transaction (or verify the
conversation's metadata version matches params.currentVersion before replacing),
and surface a conflict error so callers can retry; update references to merged
and the call site in agent-conversation.service.updateMetadata to pass signals
(or version) instead of blindly replacing the whole object.
---
Nitpick comments:
In `@apps/api/src/app/agents/services/agent-conversation.service.ts`:
- Around line 206-214: Add a blank line immediately before any early return
statements in this file; specifically insert a blank line before the return
inside getConversation (the return calling this.conversationRepository.findOne)
and do the same for the other changed wrappers referenced (the return statements
around lines 217-223 and 241-247). Locate the methods that call
this.conversationRepository.findOne or otherwise return immediately after the
signature/body opening and ensure there is one blank line separating the opening
brace and the return statement in each function.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 731c4453-7df0-4a46-a5f1-cabc69810a56
📒 Files selected for processing (2)
apps/api/src/app/agents/services/agent-conversation.service.tsapps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts
…very path only Move the serializedThread check from the unconditional getPrimaryChannel call to a targeted ensureSerializedThread guard that only runs before deliverMessage. Signals-only and resolve-only commands no longer reject conversations that haven't stored a serialized thread yet. Made-with: Cursor
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts (1)
64-83:⚠️ Potential issue | 🟠 MajorPreflight signals before sending replies.
When
replyis combined with invalid metadata signals,deliverMessage()posts externally and persists the reply beforevalidateMetadataSignalKeys()throws. Trigger signals are also accepted but silently ignored. Validate/reject all signals beforepostToConversationto avoid partial side effects and duplicate replies on retry.🐛 Suggested fix
let replyInfo: SentMessageInfo | undefined; + const metadataSignals = this.getMetadataSignals(command.signals); + this.validateMetadataSignalKeys(metadataSignals); + + const triggerSignals = (command.signals ?? []).filter((s) => s.type === 'trigger'); + if (triggerSignals.length) { + throw new BadRequestException('Trigger signals are not supported yet'); + } + if (command.reply) { this.ensureSerializedThread(channel); @@ if (command.signals?.length) { - await this.executeSignals(command, conversation, channel, command.signals); + await this.executeSignals(command, conversation, channel, command.signals, metadataSignals); } @@ private async executeSignals( command: HandleAgentReplyCommand, conversation: ConversationEntity, channel: ConversationChannel, - signals: HandleAgentReplyCommand['signals'] + signals: HandleAgentReplyCommand['signals'], + metadataSignals = this.getMetadataSignals(signals) ): Promise<void> { - const metadataSignals = (signals ?? []).filter( - (s): s is Extract<NonNullable<HandleAgentReplyCommand['signals']>[number], { type: 'metadata' }> => - s.type === 'metadata' - ); - if (metadataSignals.length) { - await this.validateMetadataSignalKeys(metadataSignals); await this.conversationService.updateMetadata({ conversationId: conversation._id, channel, @@ - const triggerSignals = (signals ?? []).filter((s) => s.type === 'trigger'); - if (triggerSignals.length) { - // TODO: execute trigger signals — requires wiring TriggerEvent or ParseEventRequest from EventsModule - } } + + private getMetadataSignals( + signals: HandleAgentReplyCommand['signals'] + ): Array<Extract<NonNullable<HandleAgentReplyCommand['signals']>[number], { type: 'metadata' }>> { + + return (signals ?? []).filter( + (s): s is Extract<NonNullable<HandleAgentReplyCommand['signals']>[number], { type: 'metadata' }> => + s.type === 'metadata' + ); + }As per coding guidelines,
apps/api/**: Check for proper error handling and input validation.Also applies to: 213-229
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts` around lines 64 - 83, Validate and reject all incoming signals before posting a reply so deliverMessage/postToConversation is not called when signals are invalid: in the handle-agent-reply flow, run the metadata signal validation (e.g., validateMetadataSignalKeys or the signal validation routine) on command.signals at the top of the reply branch (before calling deliverMessage or ensureSerializedThread) and throw/return an error when validation fails; also ensure trigger-type signals are explicitly handled (not silently ignored) so executeSignals is only called after successful validation; keep removeAckReaction behavior unchanged but ensure no external post/persist happens on invalid signals.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In
`@apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts`:
- Around line 64-83: Validate and reject all incoming signals before posting a
reply so deliverMessage/postToConversation is not called when signals are
invalid: in the handle-agent-reply flow, run the metadata signal validation
(e.g., validateMetadataSignalKeys or the signal validation routine) on
command.signals at the top of the reply branch (before calling deliverMessage or
ensureSerializedThread) and throw/return an error when validation fails; also
ensure trigger-type signals are explicitly handled (not silently ignored) so
executeSignals is only called after successful validation; keep
removeAckReaction behavior unchanged but ensure no external post/persist happens
on invalid signals.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 413099dc-86d2-4f92-9fbd-ca92e4a0af86
📒 Files selected for processing (1)
apps/api/src/app/agents/usecases/handle-agent-reply/handle-agent-reply.usecase.ts
Summary
forwardRefusage fromChatSdkService,AgentInboundHandler, andHandleAgentReply. The dependency graph is now a clean DAG.AgentConversationService— All conversation state mutations (persist messages, edits, metadata updates, resolve) now go through this single service instead of being scattered acrossHandleAgentReplyandAgentInboundHandler.ConversationChannelfrom DAL instead of redeclaring flat fields (platform,integrationId,platformThreadId). Extract sharedConversationActivityContextbase interface. Centralizechannels[0]assumption intogetPrimaryChannel().Before (circular dependencies)
graph LR ChatSdk -- "forwardRef" --> InboundHandler InboundHandler -- "forwardRef" --> HandleReply HandleReply -- "forwardRef" --> ChatSdk HandleReply --> ConversationRepo HandleReply --> ActivityRepo InboundHandler --> ConversationRepo InboundHandler --> ConversationService style ChatSdk fill:#f99 style InboundHandler fill:#f99 style HandleReply fill:#f99After (clean DAG)
graph LR ChatSdk --> InboundHandler InboundHandler --> ConversationService HandleReply --> ChatSdk HandleReply --> ConversationService ConversationService --> ConversationRepo ConversationService --> ActivityRepo style ConversationService fill:#9f9Inbound flow (webhook → bridge)
sequenceDiagram participant W as Webhook Controller participant SDK as ChatSdkService participant IH as AgentInboundHandler participant CS as AgentConversationService participant BE as BridgeExecutor W->>SDK: handleWebhook() SDK->>IH: handle(thread, message) IH->>CS: createOrGetConversation() IH->>CS: persistInboundMessage() IH->>CS: updateChannelThread() IH->>BE: execute(bridge call) Note over IH: NoBridgeUrlError? → thread.post() + CS.persistAgentMessage()Outbound flow (agent reply → platform)
sequenceDiagram participant C as API Controller participant HR as HandleAgentReply participant CS as AgentConversationService participant SDK as ChatSdkService C->>HR: execute(command) HR->>CS: getConversation() HR->>CS: getPrimaryChannel() HR->>SDK: postToConversation() HR->>CS: persistAgentMessage() Note over HR: signals? → CS.updateMetadata() Note over HR: resolve? → CS.resolveConversation()Test plan
agent-reply.e2e.ts,agent-webhook.e2e.ts,agents.e2e.ts) — tests operate at HTTP/DI level with stubbedChatSdkServiceandBridgeExecutorService, so the internal refactoring is transparentforwardReferrorsMade with Cursor
What changed
Refactors agent conversation handling by introducing AgentConversationService as the single place for conversation state mutations (persisting messages/edits, metadata updates, resolve) and removing forwardRef-based circular dependencies between ChatSdkService, AgentInboundHandler, and HandleAgentReply. The dependency graph is converted to a DAG, channel selection is centralized (getPrimaryChannel), and shared interfaces were added to standardize agent activity operations and reduce duplicated persistence logic.
Affected areas
api: Added AgentConversationService (getConversation, findByPlatformThread, setFirstPlatformMessageId, getPrimaryChannel, persistAgentMessage/persistAgentEdit, updateMetadata, resolveConversation) and new interfaces (ConversationActivityContext, PersistAgentActivityParams, UpdateMetadataParams, ResolveConversationParams); refactored AgentInboundHandler and HandleAgentReply to delegate state changes to the new service; removed forwardRef injection from ChatSdkService and consolidated channels[0] usage into getPrimaryChannel().
Key technical decisions
Testing
Run existing e2e tests (agent-reply.e2e.ts, agent-webhook.e2e.ts, agents.e2e.ts) and verify NestJS module bootstraps without forwardRef errors; no new unit tests were added in this diff and behavior relies on existing e2e coverage.