Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions alembic/versions/033_add_thinking_text_to_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Add envelope-encrypted ``thinking_text`` column to messages.

Captures the LLM's extended-thinking output (Anthropic ``thinking`` blocks)
that ``MessageResponse`` returns alongside the final assistant text. Today
the agent loop discards these blocks via ``get_response_text`` after the
last LLM call lands; with this column populated, the assistant message row
carries the reasoning that produced its ``body`` so admins can audit "why
did the agent reply this way" without re-querying the LLM.

Encrypted at rest under ``EncryptedString`` like ``body`` /
``processed_context`` / ``tool_interactions_json``: the thinking stream
quotes back user-supplied content (names, addresses, integration payloads)
and would expose the same PII as the message body if left in plaintext.

NOT NULL with a server default of empty string so existing rows
backfill cleanly and raw-SQL inserts in older migration tests (which
omit the column) keep working. Outbound messages persisted before this
migration ran simply read back as empty thinking; inbound rows always
have an empty value because the column is only written by the agent's
outbound persistence path.

Revision ID: 033
Revises: 032
Create Date: 2026-05-07
"""

from __future__ import annotations

import sqlalchemy as sa

from alembic import op

revision: str = "033"
down_revision: str = "032"
branch_labels: tuple[str, ...] | None = None
depends_on: tuple[str, ...] | None = None


def upgrade() -> None:
op.add_column(
"messages",
sa.Column("thinking_text", sa.Text(), nullable=False, server_default=""),
)


def downgrade() -> None:
op.drop_column("messages", "thinking_text")
31 changes: 30 additions & 1 deletion backend/app/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
TurnEndEvent,
TurnStartEvent,
)
from backend.app.agent.llm_parsing import ParsedToolCall, get_response_text, parse_tool_calls
from backend.app.agent.llm_parsing import (
ParsedToolCall,
get_response_text,
get_response_thinking,
parse_tool_calls,
)
from backend.app.agent.messages import (
AgentMessage,
AssistantMessage,
Expand Down Expand Up @@ -181,6 +186,13 @@ class AgentResponse:
# ``dispatch_reply_step`` so ``persist_outbound`` can store the user-facing
# text instead of just ``reply_text`` (the LLM's prose, pre-receipts).
dispatched_body: str = ""
# Concatenated extended-thinking text from the FINAL LLM response in the
# agent loop, captured from any ``ThinkingBlock`` content. Empty when the
# provider did not return thinking blocks (thinking disabled, or
# provider does not support extended thinking). ``persist_outbound``
# writes it onto the assistant message so admins can audit "why did
# the agent reply this way" without re-querying the LLM.
thinking_text: str = ""


class ClawboltAgent:
Expand Down Expand Up @@ -1058,6 +1070,7 @@ async def process_message(
memories_saved: list[dict[str, str]] = []
tool_call_records: list[StoredToolInteraction] = []
reply_text = ""
thinking_text = ""
_total_input_tokens = 0
_total_output_tokens = 0
_total_cache_creation_tokens = 0
Expand Down Expand Up @@ -1141,12 +1154,26 @@ async def process_message(
total_cache_creation_input_tokens=_total_cache_creation_tokens,
total_cache_read_input_tokens=_total_cache_read_tokens,
system_prompt=system_prompt,
# Surface any reasoning that preceded the error stop so
# downstream observers (and a future persistence policy
# that records error fallbacks) can see what the model
# was working through before it bailed. Today
# ``persist_outbound`` short-circuits on
# ``is_error_fallback``, so this rides along the in-memory
# response only.
thinking_text=get_response_thinking(response),
)

# Parse tool calls via shared parser
parsed_raw = parse_tool_calls(response)
if not parsed_raw:
reply_text = get_response_text(response)
# Capture thinking from the final response only. Earlier
# rounds produced tool calls and their thinking justifies
# a tool decision rather than the user-visible reply, so
# we keep the persisted record aligned with the message
# body the user actually saw.
thinking_text = get_response_thinking(response)

# Empty reply after tools is intentional silent action; do not re-prompt.
if not reply_text and actions_taken:
Expand Down Expand Up @@ -1239,6 +1266,7 @@ async def process_message(
else:
# Max rounds reached -- use last response content
reply_text = get_response_text(response)
thinking_text = get_response_thinking(response)
logger.debug("Max tool rounds (%d) reached, using last response", MAX_TOOL_ROUNDS)

# Collect any messages dropped by reactive trimming (ContextLengthExceededError)
Expand Down Expand Up @@ -1290,6 +1318,7 @@ async def process_message(
total_cache_creation_input_tokens=_total_cache_creation_tokens,
total_cache_read_input_tokens=_total_cache_read_tokens,
system_prompt=system_prompt,
thinking_text=thinking_text,
)

def _find_tool(self, name: str) -> Callable[..., Any] | None:
Expand Down
1 change: 1 addition & 0 deletions backend/app/agent/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class StoredMessage(BaseModel):
body: str = ""
processed_context: str = ""
tool_interactions_json: str = ""
thinking_text: str = ""
external_message_id: str = ""
media_urls_json: str = "[]"
timestamp: str = Field(default_factory=lambda: datetime.datetime.now(datetime.UTC).isoformat())
Expand Down
1 change: 1 addition & 0 deletions backend/app/agent/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ async def run_heartbeat_for_user(
direction=MessageDirection.OUTBOUND,
body=reply_text,
tool_interactions_json=tool_interactions,
thinking_text=response.thinking_text if response else "",
)

# Record heartbeat log for persistent rate limiting
Expand Down
22 changes: 21 additions & 1 deletion backend/app/agent/llm_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from dataclasses import dataclass
from typing import Any

from any_llm.types.messages import MessageResponse, TextBlock, ToolUseBlock
from any_llm.types.messages import MessageResponse, TextBlock, ThinkingBlock, ToolUseBlock

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,3 +76,23 @@ def get_response_text(response: MessageResponse) -> str:
if isinstance(block, TextBlock) and block.text:
parts.append(block.text)
return "".join(parts)


def get_response_thinking(response: MessageResponse) -> str:
"""Extract the extended-thinking text from a ``MessageResponse``.

Concatenates the ``thinking`` field across every ``ThinkingBlock``
in ``response.content``, separated by blank lines so multi-block
streams remain readable when rendered. Returns an empty string when
no thinking blocks are present (the model didn't think, or thinking
wasn't enabled in the request). Empty thinking blocks are skipped.

The cryptographic ``signature`` field on each block is not surfaced;
it is only meaningful for replaying the block back to Anthropic and
has no audit value to a human reader.
"""
parts: list[str] = []
for block in response.content:
if isinstance(block, ThinkingBlock) and block.thinking:
parts.append(block.thinking)
return "\n\n".join(parts)
1 change: 1 addition & 0 deletions backend/app/agent/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ async def persist_outbound(
direction=MessageDirection.OUTBOUND,
body=body,
tool_interactions_json=tool_interactions,
thinking_text=response.thinking_text,
)


Expand Down
12 changes: 12 additions & 0 deletions backend/app/agent/session_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def _msg_to_stored(msg: Message) -> StoredMessage:
body=msg.body,
processed_context=msg.processed_context,
tool_interactions_json=msg.tool_interactions_json,
thinking_text=msg.thinking_text,
external_message_id=msg.external_message_id,
media_urls_json=msg.media_urls_json,
timestamp=ts,
Expand Down Expand Up @@ -205,6 +206,7 @@ def _delete_all_messages_for_session(cs_id: int) -> Delete[tuple[Message]]:
"body",
"processed_context",
"tool_interactions_json",
"thinking_text",
"external_message_id",
"media_urls_json",
}
Expand Down Expand Up @@ -361,6 +363,7 @@ async def add_message(
media_urls_json: str = "[]",
processed_context: str = "",
tool_interactions_json: str = "",
thinking_text: str = "",
channel: str = "",
) -> StoredMessage:
"""Deprecated alias of :meth:`add_message_async`."""
Expand All @@ -372,6 +375,7 @@ async def add_message(
media_urls_json=media_urls_json,
processed_context=processed_context,
tool_interactions_json=tool_interactions_json,
thinking_text=thinking_text,
channel=channel,
)

Expand All @@ -384,6 +388,7 @@ async def add_message_async(
media_urls_json: str = "[]",
processed_context: str = "",
tool_interactions_json: str = "",
thinking_text: str = "",
channel: str = "",
) -> StoredMessage:
"""Insert a message into the database and update the in-memory session."""
Expand Down Expand Up @@ -415,6 +420,7 @@ async def add_message_async(
body=body,
processed_context=processed_context,
tool_interactions_json=tool_interactions_json,
thinking_text=thinking_text,
external_message_id=external_message_id,
media_urls_json=media_urls_json,
timestamp=now,
Expand All @@ -432,6 +438,7 @@ async def add_message_async(
body=body,
processed_context=processed_context,
tool_interactions_json=tool_interactions_json,
thinking_text=thinking_text,
external_message_id=external_message_id,
media_urls_json=media_urls_json,
timestamp=now.isoformat(),
Expand All @@ -457,6 +464,7 @@ async def add_message_by_session_id(
media_urls_json: str = "[]",
processed_context: str = "",
tool_interactions_json: str = "",
thinking_text: str = "",
channel: str = "",
) -> StoredMessage:
"""Deprecated alias of :meth:`add_message_by_session_id_async`."""
Expand All @@ -468,6 +476,7 @@ async def add_message_by_session_id(
media_urls_json=media_urls_json,
processed_context=processed_context,
tool_interactions_json=tool_interactions_json,
thinking_text=thinking_text,
channel=channel,
)

Expand All @@ -480,6 +489,7 @@ async def add_message_by_session_id_async(
media_urls_json: str = "[]",
processed_context: str = "",
tool_interactions_json: str = "",
thinking_text: str = "",
channel: str = "",
) -> StoredMessage:
"""Insert a message using only the session_id (no SessionState needed).
Expand All @@ -506,6 +516,7 @@ async def add_message_by_session_id_async(
body=body,
processed_context=processed_context,
tool_interactions_json=tool_interactions_json,
thinking_text=thinking_text,
external_message_id=external_message_id,
media_urls_json=media_urls_json,
timestamp=now,
Expand All @@ -521,6 +532,7 @@ async def add_message_by_session_id_async(
body=body,
processed_context=processed_context,
tool_interactions_json=tool_interactions_json,
thinking_text=thinking_text,
external_message_id=external_message_id,
media_urls_json=media_urls_json,
timestamp=now.isoformat(),
Expand Down
18 changes: 13 additions & 5 deletions backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,17 @@ class Message(Base):
"""A single message in a conversation, inbound or outbound.

User-authored content (``body``, ``processed_context``,
``tool_interactions_json``) is envelope-encrypted at rest via
``EncryptedString``. ``body`` is the raw text the user / channel
sent; ``processed_context`` is the same content after media
``tool_interactions_json``, ``thinking_text``) is envelope-encrypted
at rest via ``EncryptedString``. ``body`` is the raw text the user /
channel sent; ``processed_context`` is the same content after media
transcription / OCR / preprocessing; ``tool_interactions_json``
holds tool call args / results that frequently embed customer
names, phone numbers, and addresses passed to QuickBooks /
CompanyCam / calendar tools. The decrypt path runs transparently
on every ORM read, so application code keeps reading
CompanyCam / calendar tools. ``thinking_text`` holds the LLM's
extended-thinking blocks for outbound messages (empty for inbound),
which can quote user content back at length and so receives the
same encryption treatment. The decrypt path runs transparently on
every ORM read, so application code keeps reading
``msg.tool_interactions_json`` and gets plaintext JSON.

Other text columns intentionally left plaintext:
Expand All @@ -316,6 +319,11 @@ class Message(Base):
tool_interactions_json: Mapped[str] = mapped_column(
EncryptedString(table="messages", column="tool_interactions_json"), default=""
)
thinking_text: Mapped[str] = mapped_column(
EncryptedString(table="messages", column="thinking_text"),
default="",
server_default="",
)
external_message_id: Mapped[str] = mapped_column(String, default="")
media_urls_json: Mapped[str] = mapped_column(Text, default="")
timestamp: Mapped[datetime] = mapped_column(
Expand Down
70 changes: 65 additions & 5 deletions tests/test_llm_parsing.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
"""Tests for shared LLM response parsing utilities."""

import json

from any_llm.types.messages import MessageResponse, MessageUsage, TextBlock, ToolUseBlock

from backend.app.agent.llm_parsing import ParsedToolCall, get_response_text, parse_tool_calls
from typing import Any

from any_llm.types.messages import (
MessageResponse,
MessageUsage,
TextBlock,
ThinkingBlock,
ToolUseBlock,
)

from backend.app.agent.llm_parsing import (
ParsedToolCall,
get_response_text,
get_response_thinking,
parse_tool_calls,
)
from tests.mocks.llm import make_text_response, make_tool_call_response


def _make_response(blocks: list[TextBlock | ToolUseBlock]) -> MessageResponse:
def _make_response(blocks: list[Any]) -> MessageResponse:
"""Helper to build a MessageResponse from content blocks.

Uses ``model_construct`` to bypass pydantic validation so tests can
Expand Down Expand Up @@ -117,3 +129,51 @@ def test_returns_empty_for_empty_content(self) -> None:
"""Should return empty string when content list is empty."""
resp = _make_response([])
assert get_response_text(resp) == ""


class TestGetResponseThinking:
def test_returns_thinking_block_text(self) -> None:
"""A single thinking block should be returned verbatim."""
resp = _make_response(
[
ThinkingBlock(
type="thinking",
thinking="The user wants to know about X. I should look up Y.",
signature="sig",
),
TextBlock(type="text", text="Here is what I found."),
]
)
assert get_response_thinking(resp) == "The user wants to know about X. I should look up Y."

def test_concatenates_multiple_thinking_blocks_with_blank_line(self) -> None:
"""Multiple thinking blocks join with a blank line for readability."""
resp = _make_response(
[
ThinkingBlock(type="thinking", thinking="First thought.", signature=""),
TextBlock(type="text", text="reply"),
ThinkingBlock(type="thinking", thinking="Second thought.", signature=""),
]
)
assert get_response_thinking(resp) == "First thought.\n\nSecond thought."

def test_returns_empty_when_no_thinking_blocks(self) -> None:
"""A response with text and tool blocks but no thinking returns empty."""
resp = make_text_response("plain reply")
assert get_response_thinking(resp) == ""

def test_skips_empty_thinking_blocks(self) -> None:
"""An empty thinking string should not produce a stray separator."""
resp = _make_response(
[
ThinkingBlock(type="thinking", thinking="", signature=""),
ThinkingBlock(type="thinking", thinking="real thought", signature=""),
ThinkingBlock(type="thinking", thinking="", signature=""),
]
)
assert get_response_thinking(resp) == "real thought"

def test_returns_empty_for_empty_content(self) -> None:
"""Empty content list returns empty thinking."""
resp = _make_response([])
assert get_response_thinking(resp) == ""
Loading
Loading