diff --git a/alembic/versions/033_add_thinking_text_to_messages.py b/alembic/versions/033_add_thinking_text_to_messages.py new file mode 100644 index 00000000..4bfe991a --- /dev/null +++ b/alembic/versions/033_add_thinking_text_to_messages.py @@ -0,0 +1,47 @@ +"""Add envelope-encrypted ``thinking_text`` column to messages. + +Captures the LLM's extended-thinking output (Anthropic ``thinking`` blocks) +that ``MessageResponse`` returns alongside the final assistant text. Today +the agent loop discards these blocks via ``get_response_text`` after the +last LLM call lands; with this column populated, the assistant message row +carries the reasoning that produced its ``body`` so admins can audit "why +did the agent reply this way" without re-querying the LLM. + +Encrypted at rest under ``EncryptedString`` like ``body`` / +``processed_context`` / ``tool_interactions_json``: the thinking stream +quotes back user-supplied content (names, addresses, integration payloads) +and would expose the same PII as the message body if left in plaintext. + +NOT NULL with a server default of empty string so existing rows +backfill cleanly and raw-SQL inserts in older migration tests (which +omit the column) keep working. Outbound messages persisted before this +migration ran simply read back as empty thinking; inbound rows always +have an empty value because the column is only written by the agent's +outbound persistence path. + +Revision ID: 033 +Revises: 032 +Create Date: 2026-05-07 +""" + +from __future__ import annotations + +import sqlalchemy as sa + +from alembic import op + +revision: str = "033" +down_revision: str = "032" +branch_labels: tuple[str, ...] | None = None +depends_on: tuple[str, ...] | None = None + + +def upgrade() -> None: + op.add_column( + "messages", + sa.Column("thinking_text", sa.Text(), nullable=False, server_default=""), + ) + + +def downgrade() -> None: + op.drop_column("messages", "thinking_text") diff --git a/backend/app/agent/core.py b/backend/app/agent/core.py index 8973d87d..232b6c0f 100644 --- a/backend/app/agent/core.py +++ b/backend/app/agent/core.py @@ -35,7 +35,12 @@ TurnEndEvent, TurnStartEvent, ) -from backend.app.agent.llm_parsing import ParsedToolCall, get_response_text, parse_tool_calls +from backend.app.agent.llm_parsing import ( + ParsedToolCall, + get_response_text, + get_response_thinking, + parse_tool_calls, +) from backend.app.agent.messages import ( AgentMessage, AssistantMessage, @@ -181,6 +186,13 @@ class AgentResponse: # ``dispatch_reply_step`` so ``persist_outbound`` can store the user-facing # text instead of just ``reply_text`` (the LLM's prose, pre-receipts). dispatched_body: str = "" + # Concatenated extended-thinking text from the FINAL LLM response in the + # agent loop, captured from any ``ThinkingBlock`` content. Empty when the + # provider did not return thinking blocks (thinking disabled, or + # provider does not support extended thinking). ``persist_outbound`` + # writes it onto the assistant message so admins can audit "why did + # the agent reply this way" without re-querying the LLM. + thinking_text: str = "" class ClawboltAgent: @@ -1058,6 +1070,7 @@ async def process_message( memories_saved: list[dict[str, str]] = [] tool_call_records: list[StoredToolInteraction] = [] reply_text = "" + thinking_text = "" _total_input_tokens = 0 _total_output_tokens = 0 _total_cache_creation_tokens = 0 @@ -1141,12 +1154,26 @@ async def process_message( total_cache_creation_input_tokens=_total_cache_creation_tokens, total_cache_read_input_tokens=_total_cache_read_tokens, system_prompt=system_prompt, + # Surface any reasoning that preceded the error stop so + # downstream observers (and a future persistence policy + # that records error fallbacks) can see what the model + # was working through before it bailed. Today + # ``persist_outbound`` short-circuits on + # ``is_error_fallback``, so this rides along the in-memory + # response only. + thinking_text=get_response_thinking(response), ) # Parse tool calls via shared parser parsed_raw = parse_tool_calls(response) if not parsed_raw: reply_text = get_response_text(response) + # Capture thinking from the final response only. Earlier + # rounds produced tool calls and their thinking justifies + # a tool decision rather than the user-visible reply, so + # we keep the persisted record aligned with the message + # body the user actually saw. + thinking_text = get_response_thinking(response) # Empty reply after tools is intentional silent action; do not re-prompt. if not reply_text and actions_taken: @@ -1239,6 +1266,7 @@ async def process_message( else: # Max rounds reached -- use last response content reply_text = get_response_text(response) + thinking_text = get_response_thinking(response) logger.debug("Max tool rounds (%d) reached, using last response", MAX_TOOL_ROUNDS) # Collect any messages dropped by reactive trimming (ContextLengthExceededError) @@ -1290,6 +1318,7 @@ async def process_message( total_cache_creation_input_tokens=_total_cache_creation_tokens, total_cache_read_input_tokens=_total_cache_read_tokens, system_prompt=system_prompt, + thinking_text=thinking_text, ) def _find_tool(self, name: str) -> Callable[..., Any] | None: diff --git a/backend/app/agent/dto.py b/backend/app/agent/dto.py index 010142c2..656e8b60 100644 --- a/backend/app/agent/dto.py +++ b/backend/app/agent/dto.py @@ -54,6 +54,7 @@ class StoredMessage(BaseModel): body: str = "" processed_context: str = "" tool_interactions_json: str = "" + thinking_text: str = "" external_message_id: str = "" media_urls_json: str = "[]" timestamp: str = Field(default_factory=lambda: datetime.datetime.now(datetime.UTC).isoformat()) diff --git a/backend/app/agent/heartbeat.py b/backend/app/agent/heartbeat.py index 27b47f5e..d467292b 100644 --- a/backend/app/agent/heartbeat.py +++ b/backend/app/agent/heartbeat.py @@ -1043,6 +1043,7 @@ async def run_heartbeat_for_user( direction=MessageDirection.OUTBOUND, body=reply_text, tool_interactions_json=tool_interactions, + thinking_text=response.thinking_text if response else "", ) # Record heartbeat log for persistent rate limiting diff --git a/backend/app/agent/llm_parsing.py b/backend/app/agent/llm_parsing.py index c61b21b0..2cb96f33 100644 --- a/backend/app/agent/llm_parsing.py +++ b/backend/app/agent/llm_parsing.py @@ -11,7 +11,7 @@ from dataclasses import dataclass from typing import Any -from any_llm.types.messages import MessageResponse, TextBlock, ToolUseBlock +from any_llm.types.messages import MessageResponse, TextBlock, ThinkingBlock, ToolUseBlock logger = logging.getLogger(__name__) @@ -76,3 +76,23 @@ def get_response_text(response: MessageResponse) -> str: if isinstance(block, TextBlock) and block.text: parts.append(block.text) return "".join(parts) + + +def get_response_thinking(response: MessageResponse) -> str: + """Extract the extended-thinking text from a ``MessageResponse``. + + Concatenates the ``thinking`` field across every ``ThinkingBlock`` + in ``response.content``, separated by blank lines so multi-block + streams remain readable when rendered. Returns an empty string when + no thinking blocks are present (the model didn't think, or thinking + wasn't enabled in the request). Empty thinking blocks are skipped. + + The cryptographic ``signature`` field on each block is not surfaced; + it is only meaningful for replaying the block back to Anthropic and + has no audit value to a human reader. + """ + parts: list[str] = [] + for block in response.content: + if isinstance(block, ThinkingBlock) and block.thinking: + parts.append(block.thinking) + return "\n\n".join(parts) diff --git a/backend/app/agent/router.py b/backend/app/agent/router.py index 4a6a855d..b0409cc6 100644 --- a/backend/app/agent/router.py +++ b/backend/app/agent/router.py @@ -447,6 +447,7 @@ async def persist_outbound( direction=MessageDirection.OUTBOUND, body=body, tool_interactions_json=tool_interactions, + thinking_text=response.thinking_text, ) diff --git a/backend/app/agent/session_db.py b/backend/app/agent/session_db.py index 336dd326..578dac55 100644 --- a/backend/app/agent/session_db.py +++ b/backend/app/agent/session_db.py @@ -59,6 +59,7 @@ def _msg_to_stored(msg: Message) -> StoredMessage: body=msg.body, processed_context=msg.processed_context, tool_interactions_json=msg.tool_interactions_json, + thinking_text=msg.thinking_text, external_message_id=msg.external_message_id, media_urls_json=msg.media_urls_json, timestamp=ts, @@ -205,6 +206,7 @@ def _delete_all_messages_for_session(cs_id: int) -> Delete[tuple[Message]]: "body", "processed_context", "tool_interactions_json", + "thinking_text", "external_message_id", "media_urls_json", } @@ -361,6 +363,7 @@ async def add_message( media_urls_json: str = "[]", processed_context: str = "", tool_interactions_json: str = "", + thinking_text: str = "", channel: str = "", ) -> StoredMessage: """Deprecated alias of :meth:`add_message_async`.""" @@ -372,6 +375,7 @@ async def add_message( media_urls_json=media_urls_json, processed_context=processed_context, tool_interactions_json=tool_interactions_json, + thinking_text=thinking_text, channel=channel, ) @@ -384,6 +388,7 @@ async def add_message_async( media_urls_json: str = "[]", processed_context: str = "", tool_interactions_json: str = "", + thinking_text: str = "", channel: str = "", ) -> StoredMessage: """Insert a message into the database and update the in-memory session.""" @@ -415,6 +420,7 @@ async def add_message_async( body=body, processed_context=processed_context, tool_interactions_json=tool_interactions_json, + thinking_text=thinking_text, external_message_id=external_message_id, media_urls_json=media_urls_json, timestamp=now, @@ -432,6 +438,7 @@ async def add_message_async( body=body, processed_context=processed_context, tool_interactions_json=tool_interactions_json, + thinking_text=thinking_text, external_message_id=external_message_id, media_urls_json=media_urls_json, timestamp=now.isoformat(), @@ -457,6 +464,7 @@ async def add_message_by_session_id( media_urls_json: str = "[]", processed_context: str = "", tool_interactions_json: str = "", + thinking_text: str = "", channel: str = "", ) -> StoredMessage: """Deprecated alias of :meth:`add_message_by_session_id_async`.""" @@ -468,6 +476,7 @@ async def add_message_by_session_id( media_urls_json=media_urls_json, processed_context=processed_context, tool_interactions_json=tool_interactions_json, + thinking_text=thinking_text, channel=channel, ) @@ -480,6 +489,7 @@ async def add_message_by_session_id_async( media_urls_json: str = "[]", processed_context: str = "", tool_interactions_json: str = "", + thinking_text: str = "", channel: str = "", ) -> StoredMessage: """Insert a message using only the session_id (no SessionState needed). @@ -506,6 +516,7 @@ async def add_message_by_session_id_async( body=body, processed_context=processed_context, tool_interactions_json=tool_interactions_json, + thinking_text=thinking_text, external_message_id=external_message_id, media_urls_json=media_urls_json, timestamp=now, @@ -521,6 +532,7 @@ async def add_message_by_session_id_async( body=body, processed_context=processed_context, tool_interactions_json=tool_interactions_json, + thinking_text=thinking_text, external_message_id=external_message_id, media_urls_json=media_urls_json, timestamp=now.isoformat(), diff --git a/backend/app/models.py b/backend/app/models.py index ba871ad8..77b0137e 100644 --- a/backend/app/models.py +++ b/backend/app/models.py @@ -282,14 +282,17 @@ class Message(Base): """A single message in a conversation, inbound or outbound. User-authored content (``body``, ``processed_context``, - ``tool_interactions_json``) is envelope-encrypted at rest via - ``EncryptedString``. ``body`` is the raw text the user / channel - sent; ``processed_context`` is the same content after media + ``tool_interactions_json``, ``thinking_text``) is envelope-encrypted + at rest via ``EncryptedString``. ``body`` is the raw text the user / + channel sent; ``processed_context`` is the same content after media transcription / OCR / preprocessing; ``tool_interactions_json`` holds tool call args / results that frequently embed customer names, phone numbers, and addresses passed to QuickBooks / - CompanyCam / calendar tools. The decrypt path runs transparently - on every ORM read, so application code keeps reading + CompanyCam / calendar tools. ``thinking_text`` holds the LLM's + extended-thinking blocks for outbound messages (empty for inbound), + which can quote user content back at length and so receives the + same encryption treatment. The decrypt path runs transparently on + every ORM read, so application code keeps reading ``msg.tool_interactions_json`` and gets plaintext JSON. Other text columns intentionally left plaintext: @@ -316,6 +319,11 @@ class Message(Base): tool_interactions_json: Mapped[str] = mapped_column( EncryptedString(table="messages", column="tool_interactions_json"), default="" ) + thinking_text: Mapped[str] = mapped_column( + EncryptedString(table="messages", column="thinking_text"), + default="", + server_default="", + ) external_message_id: Mapped[str] = mapped_column(String, default="") media_urls_json: Mapped[str] = mapped_column(Text, default="") timestamp: Mapped[datetime] = mapped_column( diff --git a/tests/test_llm_parsing.py b/tests/test_llm_parsing.py index bed3c314..e3304039 100644 --- a/tests/test_llm_parsing.py +++ b/tests/test_llm_parsing.py @@ -1,14 +1,26 @@ """Tests for shared LLM response parsing utilities.""" import json - -from any_llm.types.messages import MessageResponse, MessageUsage, TextBlock, ToolUseBlock - -from backend.app.agent.llm_parsing import ParsedToolCall, get_response_text, parse_tool_calls +from typing import Any + +from any_llm.types.messages import ( + MessageResponse, + MessageUsage, + TextBlock, + ThinkingBlock, + ToolUseBlock, +) + +from backend.app.agent.llm_parsing import ( + ParsedToolCall, + get_response_text, + get_response_thinking, + parse_tool_calls, +) from tests.mocks.llm import make_text_response, make_tool_call_response -def _make_response(blocks: list[TextBlock | ToolUseBlock]) -> MessageResponse: +def _make_response(blocks: list[Any]) -> MessageResponse: """Helper to build a MessageResponse from content blocks. Uses ``model_construct`` to bypass pydantic validation so tests can @@ -117,3 +129,51 @@ def test_returns_empty_for_empty_content(self) -> None: """Should return empty string when content list is empty.""" resp = _make_response([]) assert get_response_text(resp) == "" + + +class TestGetResponseThinking: + def test_returns_thinking_block_text(self) -> None: + """A single thinking block should be returned verbatim.""" + resp = _make_response( + [ + ThinkingBlock( + type="thinking", + thinking="The user wants to know about X. I should look up Y.", + signature="sig", + ), + TextBlock(type="text", text="Here is what I found."), + ] + ) + assert get_response_thinking(resp) == "The user wants to know about X. I should look up Y." + + def test_concatenates_multiple_thinking_blocks_with_blank_line(self) -> None: + """Multiple thinking blocks join with a blank line for readability.""" + resp = _make_response( + [ + ThinkingBlock(type="thinking", thinking="First thought.", signature=""), + TextBlock(type="text", text="reply"), + ThinkingBlock(type="thinking", thinking="Second thought.", signature=""), + ] + ) + assert get_response_thinking(resp) == "First thought.\n\nSecond thought." + + def test_returns_empty_when_no_thinking_blocks(self) -> None: + """A response with text and tool blocks but no thinking returns empty.""" + resp = make_text_response("plain reply") + assert get_response_thinking(resp) == "" + + def test_skips_empty_thinking_blocks(self) -> None: + """An empty thinking string should not produce a stray separator.""" + resp = _make_response( + [ + ThinkingBlock(type="thinking", thinking="", signature=""), + ThinkingBlock(type="thinking", thinking="real thought", signature=""), + ThinkingBlock(type="thinking", thinking="", signature=""), + ] + ) + assert get_response_thinking(resp) == "real thought" + + def test_returns_empty_for_empty_content(self) -> None: + """Empty content list returns empty thinking.""" + resp = _make_response([]) + assert get_response_thinking(resp) == "" diff --git a/tests/test_session_db_async.py b/tests/test_session_db_async.py index 5dbd17e0..7a5acef1 100644 --- a/tests/test_session_db_async.py +++ b/tests/test_session_db_async.py @@ -161,6 +161,38 @@ async def test_add_message_async_inserts_and_assigns_seq( assert session.messages[-1].body == "hello" +async def test_add_message_async_round_trips_thinking_text( + async_db: async_sessionmaker, +) -> None: + """Outbound messages persist and re-read the LLM's extended-thinking text. + + Guards three things at once: the new ``thinking_text`` column is + written by ``add_message_async``, the ``EncryptedString`` decorator + round-trips the value (a non-envelope value would raise on read), and + the in-memory ``StoredMessage`` carries the same field so callers do + not have to re-fetch. + """ + user_id = await _create_user(async_db) + store = SessionStore(user_id) + session, _ = await store.get_or_create_session_async() + + saved = await store.add_message_async( + session, + direction="outbound", + body="here is the answer", + thinking_text="step 1: parse the ask\nstep 2: pick the tool\nstep 3: phrase the reply", + ) + assert saved.thinking_text == ( + "step 1: parse the ask\nstep 2: pick the tool\nstep 3: phrase the reply" + ) + + reloaded = await store.load_session_async(session.session_id) + assert reloaded is not None + assert reloaded.messages[-1].thinking_text == ( + "step 1: parse the ask\nstep 2: pick the tool\nstep 3: phrase the reply" + ) + + async def test_add_message_by_session_id_async_assigns_seq_independently( async_db: async_sessionmaker, ) -> None: