diff --git a/graphiti_core/prompts/extract_edges.py b/graphiti_core/prompts/extract_edges.py
index 55b4a901b..c4b836301 100644
--- a/graphiti_core/prompts/extract_edges.py
+++ b/graphiti_core/prompts/extract_edges.py
@@ -46,9 +46,9 @@ class Edge(BaseModel):
description='The date and time when the relationship described by the edge fact stopped being true or ended. Use ISO 8601 format (YYYY-MM-DDTHH:MM:SS.SSSSSSZ)',
)
episode_indices: list[int] = Field(
- default_factory=lambda: [1],
- description='List of episode numbers (1-indexed) that this fact was derived from. '
- 'When processing a single episode, this should be [1].',
+ default_factory=lambda: [0],
+ description='List of episode numbers (0-indexed) that this fact was derived from. '
+ 'When processing a single episode, this should be [0].',
)
@@ -56,14 +56,39 @@ class ExtractedEdges(BaseModel):
edges: list[Edge]
+class EdgeTimestamps(BaseModel):
+ """Temporal bounds extracted from a fact."""
+
+ valid_at: str | None = Field(
+ None,
+ description='When the fact became true. ISO 8601 with Z suffix (e.g., 2025-04-30T00:00:00Z)',
+ )
+ invalid_at: str | None = Field(
+ None,
+ description='When the fact stopped being true. ISO 8601 with Z suffix (e.g., 2025-04-30T00:00:00Z)',
+ )
+
+
+class BatchEdgeTimestamps(BaseModel):
+ """Temporal bounds for a batch of facts."""
+
+ timestamps: list[EdgeTimestamps] = Field(
+ ..., description='Timestamps for each fact, in the same order as the input facts'
+ )
+
+
class Prompt(Protocol):
edge: PromptVersion
extract_attributes: PromptVersion
+ extract_timestamps: PromptVersion
+ extract_timestamps_batch: PromptVersion
class Versions(TypedDict):
edge: PromptFunction
extract_attributes: PromptFunction
+ extract_timestamps: PromptFunction
+ extract_timestamps_batch: PromptFunction
def edge(context: dict[str, Any]) -> list[Message]:
@@ -127,8 +152,8 @@ def edge(context: dict[str, Any]) -> list[Message]:
- GOOD: "Nate plays games on a Gamecube" → Nate -> PLAYS_GAMES_ON -> Gamecube (when "Gamecube" is in ENTITIES)
- GOOD: "Alice congratulated Bob" (relationship between two entities), "Alice lives in Paris" (relationship between entity and place)
4. Do not emit semantically redundant facts, even across episodes within the CURRENT_MESSAGE. However, if a later episode adds specific details to a previously stated fact (e.g., adding a brand name, a count, a color, a location, or any concrete attribute), extract the more detailed version as a NEW fact — it is NOT a duplicate. Only treat facts as duplicates when they convey the same specificity.
- - NOT a duplicate: "user plays video games" (episode 1) vs. "user plays games on a Gamecube" (episode 2) → extract the second, more detailed fact.
- - IS a duplicate: "user plays games on a Gamecube" (episode 1) vs. "user plays Gamecube games" (episode 2) → extract once, list both episodes in `episode_indices`.
+ - NOT a duplicate: "user plays video games" (Episode 0) vs. "user plays games on a Gamecube" (Episode 1) → extract the second, more detailed fact.
+ - IS a duplicate: "user plays games on a Gamecube" (Episode 0) vs. "user plays Gamecube games" (Episode 1) → extract once, list both episodes in `episode_indices`.
5. The `fact` MUST preserve all specific details from the source text: proper nouns, brand names, product names, model numbers, quantities, counts, colors, materials, physical descriptions, specific items, named locations, and named activities. Paraphrase the sentence structure but NEVER generalize:
- NEVER generalize "Gamecube" to "gaming console", "Ford Mustang" to "car", "wool coat" to "coat", "red and purple lighting" to "lighting", "cracked windshield" to "car damage", or "three screenplays" to "several screenplays".
- Do not verbatim quote the original text, but every concrete noun, number, and descriptor in the source should survive into the `fact`.
@@ -188,7 +213,71 @@ def extract_attributes(context: dict[str, Any]) -> list[Message]:
]
+def extract_timestamps(context: dict[str, Any]) -> list[Message]:
+ return [
+ Message(
+ role='system',
+ content='You extract temporal bounds from facts. NEVER hallucinate dates.',
+ ),
+ Message(
+ role='user',
+ content=f"""Given a FACT and its REFERENCE TIME, determine when the fact became true
+(valid_at) and when it stopped being true (invalid_at).
+
+Rules:
+- Resolve relative expressions ("last week", "2 years ago", "yesterday") using REFERENCE TIME.
+- If the fact is ongoing (present tense), set valid_at to REFERENCE TIME.
+- If a change or end is expressed, set invalid_at to the relevant time.
+- Leave both null if no time is stated or resolvable.
+- If only a date is mentioned (no time), assume 00:00:00.
+- Use ISO 8601 with Z suffix (e.g., 2025-04-30T00:00:00Z).
+- Do NOT hallucinate or infer dates from unrelated events.
+
+
+{context['fact']}
+
+
+
+{context['reference_time']}
+
+""",
+ ),
+ ]
+
+
+def extract_timestamps_batch(context: dict[str, Any]) -> list[Message]:
+ return [
+ Message(
+ role='system',
+ content='You extract temporal bounds from facts. NEVER hallucinate dates.',
+ ),
+ Message(
+ role='user',
+ content=f"""Given a list of FACTS with their REFERENCE TIMES, determine when each fact
+became true (valid_at) and when it stopped being true (invalid_at).
+
+Rules:
+- Resolve relative expressions ("last week", "2 years ago", "yesterday") using each fact's REFERENCE TIME.
+- If the fact is ongoing (present tense), set valid_at to its REFERENCE TIME.
+- If a change or end is expressed, set invalid_at to the relevant time.
+- Leave both null if no time is stated or resolvable.
+- If only a date is mentioned (no time), assume 00:00:00.
+- Use ISO 8601 with Z suffix (e.g., 2025-04-30T00:00:00Z).
+- Do NOT hallucinate or infer dates from unrelated events.
+
+Return one timestamps entry per fact, in the same order.
+
+
+{to_prompt_json(context['facts'])}
+
+""",
+ ),
+ ]
+
+
versions: Versions = {
'edge': edge,
'extract_attributes': extract_attributes,
+ 'extract_timestamps': extract_timestamps,
+ 'extract_timestamps_batch': extract_timestamps_batch,
}
diff --git a/graphiti_core/prompts/extract_nodes.py b/graphiti_core/prompts/extract_nodes.py
index 9f2b7601c..5b41623db 100644
--- a/graphiti_core/prompts/extract_nodes.py
+++ b/graphiti_core/prompts/extract_nodes.py
@@ -32,9 +32,9 @@ class ExtractedEntity(BaseModel):
'Must be one of the provided entity_type_id integers.',
)
episode_indices: list[int] = Field(
- default_factory=lambda: [1],
- description='List of episode numbers (1-indexed) this entity was extracted from. '
- 'When processing a single episode, this should be [1].',
+ default_factory=lambda: [0],
+ description='List of episode numbers (0-indexed) this entity was extracted from. '
+ 'When processing a single episode, this should be [0].',
)
diff --git a/graphiti_core/prompts/extract_nodes_and_edges.py b/graphiti_core/prompts/extract_nodes_and_edges.py
new file mode 100644
index 000000000..a29336551
--- /dev/null
+++ b/graphiti_core/prompts/extract_nodes_and_edges.py
@@ -0,0 +1,154 @@
+"""
+Copyright 2024, Zep Software, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from typing import Any, Protocol, TypedDict
+
+from pydantic import BaseModel, Field
+
+from .models import Message, PromptFunction, PromptVersion
+from .prompt_helpers import to_prompt_json
+
+
+class CombinedEntity(BaseModel):
+ """Entity extracted by the combined node+edge extraction prompt."""
+
+ name: str = Field(..., description='Name of the extracted entity')
+ entity_type_id: int = Field(
+ description='ID of the classified entity type. '
+ 'Must be one of the provided entity_type_id integers.',
+ )
+
+
+class CombinedFact(BaseModel):
+ """Relationship fact extracted by the combined node+edge extraction prompt."""
+
+ source_entity_name: str = Field(
+ ..., description='The name of the source entity from the extracted entities list'
+ )
+ target_entity_name: str = Field(
+ ..., description='The name of the target entity from the extracted entities list'
+ )
+ relation_type: str = Field(
+ ...,
+ description='The type of relationship between the entities, in SCREAMING_SNAKE_CASE '
+ '(e.g., WORKS_AT, LIVES_IN, IS_FRIENDS_WITH)',
+ )
+ fact: str = Field(
+ ...,
+ description='A self-contained natural language description of the relationship, '
+ 'paraphrased from the source text with all specific details preserved',
+ )
+ episode_indices: list[int] = Field(
+ default_factory=lambda: [0],
+ description='List of episode numbers (0-indexed) that this fact was derived from. '
+ 'When processing a single episode, this should be [0].',
+ )
+
+
+class CombinedExtraction(BaseModel):
+ """Combined node and edge extraction response."""
+
+ extracted_entities: list[CombinedEntity] = Field(..., description='List of extracted entities')
+ edges: list[CombinedFact] = Field(..., description='List of extracted relationship facts')
+
+
+class Prompt(Protocol):
+ extract_message: PromptVersion
+
+
+class Versions(TypedDict):
+ extract_message: PromptFunction
+
+
+def extract_message(context: dict[str, Any]) -> list[Message]:
+ sys_prompt = (
+ 'You are an expert knowledge graph extraction specialist for an AI agent memory system. '
+ 'You extract both entity nodes and relationship facts from conversations in a single pass. '
+ 'The extracted graph will be searched later by an AI agent to answer questions, personalize '
+ 'responses, and maintain long-term memory. The original conversation will NOT be available '
+ 'at retrieval time — only the entities and facts you extract will survive.'
+ )
+
+ user_prompt = f"""
+ENTITY RULES:
+1. Extract speakers and named entities explicitly mentioned in CURRENT MESSAGES.
+2. Entity names must be at most 5 words. Use the most specific form mentioned.
+3. When someone discusses their possession, project, pet, or creation, extract it
+ as a SEPARATE possessive entity — not just the person, not just the bare noun:
+ GOOD: "James's notebook", "Calvin's guitar", "Audrey's dogs", "Sam's cooking class"
+ BAD: "notebook", "guitar", "dogs" (too generic) or just "James" (collapses detail)
+4. Extract hobbies and activities as entities when someone engages in them:
+ "video games", "watercolor painting", "VR gaming", "road cycling", "cooking"
+5. Extract named/described objects ("Gamecube", "Ford Mustang", "wool coat") and
+ places ("Riverside Park", "the gym", "the beach") — not bare generics ("car", "coat").
+6. Do NOT extract: pronouns, vague abstractions (balance, growth, motivation),
+ filler nouns (day, life, stuff, time), dates as entities, full sentences as names.
+7. Each entity appears exactly ONCE. Classify using the ENTITY TYPES provided.
+8. Only extract entities from CURRENT MESSAGES — PREVIOUS MESSAGES are context only.
+
+FACT RULES:
+1. source_entity_name and target_entity_name must match your extracted entity names.
+2. When a fact involves two entities that are BOTH in your extracted entities list,
+ you MUST use both as source and target — never collapse into a self-referencing fact:
+ "Nate plays games on a Gamecube" → Nate -> PLAYS_GAMES_ON -> Gamecube
+ "Sarah lives in San Francisco" → Sarah -> LIVES_IN -> San Francisco
+ "James has a dog named Maximilian" → James -> HAS_PET -> Maximilian
+ Only use a self-referencing fact when no second entity in your list fits.
+ Self-referencing facts are still common and valuable — do NOT skip them:
+ - Routines/health: "Deborah goes jogging every morning", "Evan has a knee injury"
+ - Preferences/plans: "Nate's favorite game is Xenoblade Chronicles",
+ "Jon said he would not quit on his dreams"
+ - Emotions/states: "Sam feels he lacks motivation"
+3. Facts must be SELF-CONTAINED — understandable without the original episode.
+ Use entity names, not pronouns. Preserve specific details where possible.
+4. Extract facts from EVERY episode — not just the first. Process each episode's
+ CURRENT_MESSAGE independently. Set `episode_indices` to the 0-based episode
+ number(s) each fact comes from (matching [Episode N] headers).
+ If the SAME fact appears across multiple episodes, extract it ONCE and list ALL
+ episode indices — do NOT emit duplicate facts with different episode numbers.
+5. You MAY use PREVIOUS MESSAGES to resolve what the current message refers to.
+ If the current message reacts to or confirms prior context, extract the full
+ contextualized fact (e.g., "all the hard work paid off" → extract what paid off).
+6. Extract liberally — when in doubt, extract the fact. Preferences, opinions,
+ reactions, advice, plans, states, and experiences are all valuable. Only skip
+ content-free utterances like "Hi!", "Bye!", "Thanks!".
+7. Do not emit redundant facts across episodes. But if a later episode adds new
+ details (brand, count, location), extract the more detailed version as a new fact.
+
+
+{context['entity_types']}
+
+
+
+{to_prompt_json([ep for ep in context['previous_episodes']])}
+
+
+
+{context['episode_content']}
+
+
+{context['custom_extraction_instructions']}
+"""
+
+ return [
+ Message(role='system', content=sys_prompt),
+ Message(role='user', content=user_prompt),
+ ]
+
+
+versions: Versions = {
+ 'extract_message': extract_message,
+}
diff --git a/graphiti_core/prompts/lib.py b/graphiti_core/prompts/lib.py
index 34979b6c8..72a8199c8 100644
--- a/graphiti_core/prompts/lib.py
+++ b/graphiti_core/prompts/lib.py
@@ -31,6 +31,9 @@
from .extract_nodes import Prompt as ExtractNodesPrompt
from .extract_nodes import Versions as ExtractNodesVersions
from .extract_nodes import versions as extract_nodes_versions
+from .extract_nodes_and_edges import Prompt as ExtractNodesAndEdgesPrompt
+from .extract_nodes_and_edges import Versions as ExtractNodesAndEdgesVersions
+from .extract_nodes_and_edges import versions as extract_nodes_and_edges_versions
from .models import Message, PromptFunction
from .prompt_helpers import DO_NOT_ESCAPE_UNICODE
from .summarize_nodes import Prompt as SummarizeNodesPrompt
@@ -45,6 +48,7 @@ class PromptLibrary(Protocol):
extract_nodes: ExtractNodesPrompt
dedupe_nodes: DedupeNodesPrompt
extract_edges: ExtractEdgesPrompt
+ extract_nodes_and_edges: ExtractNodesAndEdgesPrompt
dedupe_edges: DedupeEdgesPrompt
summarize_nodes: SummarizeNodesPrompt
summarize_sagas: SummarizeSagasPrompt
@@ -55,6 +59,7 @@ class PromptLibraryImpl(TypedDict):
extract_nodes: ExtractNodesVersions
dedupe_nodes: DedupeNodesVersions
extract_edges: ExtractEdgesVersions
+ extract_nodes_and_edges: ExtractNodesAndEdgesVersions
dedupe_edges: DedupeEdgesVersions
summarize_nodes: SummarizeNodesVersions
summarize_sagas: SummarizeSagasVersions
@@ -88,6 +93,7 @@ def __init__(self, library: PromptLibraryImpl):
'extract_nodes': extract_nodes_versions,
'dedupe_nodes': dedupe_nodes_versions,
'extract_edges': extract_edges_versions,
+ 'extract_nodes_and_edges': extract_nodes_and_edges_versions,
'dedupe_edges': dedupe_edges_versions,
'summarize_nodes': summarize_nodes_versions,
'summarize_sagas': summarize_sagas_versions,
diff --git a/graphiti_core/utils/bulk_utils.py b/graphiti_core/utils/bulk_utils.py
index ad4cc451f..92f25582e 100644
--- a/graphiti_core/utils/bulk_utils.py
+++ b/graphiti_core/utils/bulk_utils.py
@@ -268,7 +268,75 @@ async def extract_nodes_and_edges_bulk(
excluded_entity_types: list[str] | None = None,
edge_types: dict[str, type[BaseModel]] | None = None,
custom_extraction_instructions: str | None = None,
+ use_combined_extraction: bool = False,
) -> tuple[list[list[EntityNode]], list[list[EntityEdge]]]:
+ if use_combined_extraction:
+ return await _extract_nodes_and_edges_bulk_combined(
+ clients,
+ episode_tuples,
+ edge_type_map,
+ entity_types=entity_types,
+ excluded_entity_types=excluded_entity_types,
+ edge_types=edge_types,
+ custom_extraction_instructions=custom_extraction_instructions,
+ )
+
+ return await _extract_nodes_and_edges_bulk_separate(
+ clients,
+ episode_tuples,
+ edge_type_map,
+ entity_types=entity_types,
+ excluded_entity_types=excluded_entity_types,
+ edge_types=edge_types,
+ custom_extraction_instructions=custom_extraction_instructions,
+ )
+
+
+async def _extract_nodes_and_edges_bulk_combined(
+ clients: GraphitiClients,
+ episode_tuples: list[tuple[EpisodicNode, list[EpisodicNode]]],
+ edge_type_map: dict[tuple[str, str], list[str]],
+ entity_types: dict[str, type[BaseModel]] | None = None,
+ excluded_entity_types: list[str] | None = None,
+ edge_types: dict[str, type[BaseModel]] | None = None,
+ custom_extraction_instructions: str | None = None,
+) -> tuple[list[list[EntityNode]], list[list[EntityEdge]]]:
+ """Combined extraction: single LLM call per episode for both nodes and edges."""
+ from graphiti_core.utils.maintenance.combined_extraction import (
+ extract_nodes_and_edges as extract_combined,
+ )
+
+ results = await semaphore_gather(
+ *[
+ extract_combined(
+ clients,
+ episode,
+ previous_episodes,
+ entity_types=entity_types,
+ excluded_entity_types=excluded_entity_types,
+ edge_type_map=edge_type_map,
+ edge_types=edge_types,
+ custom_extraction_instructions=custom_extraction_instructions,
+ )
+ for episode, previous_episodes in episode_tuples
+ ]
+ )
+
+ nodes_bulk = [nodes for nodes, _, _ in results]
+ edges_bulk = [edges for _, edges, _ in results]
+ return nodes_bulk, edges_bulk
+
+
+async def _extract_nodes_and_edges_bulk_separate(
+ clients: GraphitiClients,
+ episode_tuples: list[tuple[EpisodicNode, list[EpisodicNode]]],
+ edge_type_map: dict[tuple[str, str], list[str]],
+ entity_types: dict[str, type[BaseModel]] | None = None,
+ excluded_entity_types: list[str] | None = None,
+ edge_types: dict[str, type[BaseModel]] | None = None,
+ custom_extraction_instructions: str | None = None,
+) -> tuple[list[list[EntityNode]], list[list[EntityEdge]]]:
+ """Separate extraction: two sequential LLM calls per episode (legacy)."""
extracted_results: list[tuple[list[EntityNode], dict[str, list[int]]]] = await semaphore_gather(
*[
extract_nodes(
diff --git a/graphiti_core/utils/maintenance/combined_extraction.py b/graphiti_core/utils/maintenance/combined_extraction.py
new file mode 100644
index 000000000..f08510a28
--- /dev/null
+++ b/graphiti_core/utils/maintenance/combined_extraction.py
@@ -0,0 +1,302 @@
+"""
+Copyright 2024, Zep Software, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import logging
+from datetime import datetime
+from time import time
+
+from pydantic import BaseModel
+
+from graphiti_core.edges import EntityEdge
+from graphiti_core.graphiti_types import GraphitiClients
+from graphiti_core.llm_client.config import ModelSize
+from graphiti_core.nodes import EntityNode, EpisodicNode
+from graphiti_core.prompts import prompt_library
+from graphiti_core.prompts.extract_edges import BatchEdgeTimestamps
+from graphiti_core.prompts.extract_nodes_and_edges import CombinedExtraction
+from graphiti_core.utils.datetime_utils import ensure_utc, utc_now
+from graphiti_core.utils.maintenance.dedup_helpers import _normalize_string_exact
+from graphiti_core.utils.maintenance.node_operations import (
+ _build_entity_types_context,
+ _collapse_exact_duplicate_extracted_nodes,
+)
+from graphiti_core.utils.text_utils import concatenate_episodes
+
+logger = logging.getLogger(__name__)
+
+
+async def extract_nodes_and_edges(
+ clients: GraphitiClients,
+ episode: EpisodicNode | list[EpisodicNode],
+ previous_episodes: list[EpisodicNode],
+ entity_types: dict[str, type[BaseModel]] | None = None,
+ excluded_entity_types: list[str] | None = None,
+ edge_type_map: dict[tuple[str, str], list[str]] | None = None,
+ edge_types: dict[str, type[BaseModel]] | None = None,
+ custom_extraction_instructions: str | None = None,
+) -> tuple[list[EntityNode], list[EntityEdge], dict[str, list[int]]]:
+ """Extract entity nodes and relationship facts in a single LLM call.
+
+ This combined extraction produces better results than separate node+edge
+ extraction because the model can see both tasks simultaneously, ensuring
+ every entity has at least one connecting fact and reducing orphaned nodes.
+
+ Parameters
+ ----------
+ clients : GraphitiClients
+ LLM and embedder clients.
+ episode : EpisodicNode | list[EpisodicNode]
+ A single episode or a list of episodes to extract from.
+ previous_episodes : list[EpisodicNode]
+ Prior episodes for context (not extracted from).
+ entity_types : dict | None
+ Custom entity type definitions.
+ excluded_entity_types : list[str] | None
+ Entity types to exclude from extraction.
+ edge_type_map : dict | None
+ Mapping of edge type signatures (unused in combined prompt but kept for API compat).
+ edge_types : dict | None
+ Custom edge type definitions (unused in combined prompt but kept for API compat).
+ custom_extraction_instructions : str | None
+ Additional extraction instructions.
+
+ Returns
+ -------
+ tuple[list[EntityNode], list[EntityEdge], dict[str, list[int]]]
+ A tuple of (nodes, edges, node_episode_index_map) where
+ node_episode_index_map maps node UUID to 0-indexed episode positions.
+ """
+ episodes = episode if isinstance(episode, list) else [episode]
+ primary_episode = episodes[0]
+
+ if edge_types:
+ logger.debug(
+ 'Combined extraction does not use custom edge types; %d edge type(s) will be ignored',
+ len(edge_types),
+ )
+ if edge_type_map:
+ logger.debug(
+ 'Combined extraction does not use edge_type_map; %d mapping(s) will be ignored',
+ len(edge_type_map),
+ )
+
+ start = time()
+ llm_client = clients.llm_client
+
+ # Build entity types context
+ entity_types_context = _build_entity_types_context(entity_types)
+
+ # Build context for the combined prompt
+ context = {
+ 'episode_content': concatenate_episodes(episodes),
+ 'previous_episodes': [
+ {
+ 'content': ep.content,
+ 'timestamp': ep.valid_at.isoformat() if ep.valid_at else None,
+ }
+ for ep in previous_episodes
+ ],
+ 'custom_extraction_instructions': custom_extraction_instructions or '',
+ 'entity_types': entity_types_context,
+ }
+
+ # Single LLM call for combined extraction
+ llm_response = await llm_client.generate_response(
+ prompt_library.extract_nodes_and_edges.extract_message(context),
+ response_model=CombinedExtraction,
+ group_id=primary_episode.group_id,
+ prompt_name='extract_nodes_and_edges.extract_message',
+ )
+ response_object = CombinedExtraction(**llm_response)
+
+ end = time()
+ logger.debug(
+ f'Combined extraction: {len(response_object.extracted_entities)} entities, '
+ f'{len(response_object.edges)} edges in {(end - start) * 1000:.0f} ms'
+ )
+
+ # --- Process nodes ---
+
+ # Filter empty names
+ filtered_entities = [e for e in response_object.extracted_entities if e.name.strip()]
+
+ # Convert CombinedEntity objects to EntityNode objects (no episode attribution yet —
+ # that is derived from edges below).
+ extracted_nodes: list[EntityNode] = []
+ for entity in filtered_entities:
+ type_id = entity.entity_type_id
+ if 0 <= type_id < len(entity_types_context):
+ entity_type_name = entity_types_context[type_id].get('entity_type_name')
+ else:
+ entity_type_name = 'Entity'
+
+ if excluded_entity_types and entity_type_name in excluded_entity_types:
+ logger.debug(f'Excluding entity of type "{entity_type_name}"')
+ continue
+
+ labels: list[str] = list({'Entity', str(entity_type_name)})
+ new_node = EntityNode(
+ name=entity.name,
+ group_id=primary_episode.group_id,
+ labels=labels,
+ summary='',
+ created_at=utc_now(),
+ )
+ extracted_nodes.append(new_node)
+
+ # Collapse exact-duplicate nodes (same normalized name).
+ # Temporarily use an empty map — real attribution comes from edges below.
+ node_episode_index_map: dict[str, list[int]] = {}
+ extracted_nodes = _collapse_exact_duplicate_extracted_nodes(
+ extracted_nodes, node_episode_index_map
+ )
+
+ # --- Process edges ---
+
+ # Build normalized name-to-node map so case/whitespace differences don't drop edges
+ name_to_node: dict[str, EntityNode] = {
+ _normalize_string_exact(node.name): node for node in extracted_nodes
+ }
+
+ extracted_edges: list[EntityEdge] = []
+ for edge_data in response_object.edges:
+ # Validate source and target exist in extracted nodes (case-insensitive)
+ source_node = name_to_node.get(_normalize_string_exact(edge_data.source_entity_name))
+ target_node = name_to_node.get(_normalize_string_exact(edge_data.target_entity_name))
+
+ if source_node is None:
+ logger.debug(
+ f'Skipping edge: source "{edge_data.source_entity_name}" not in extracted nodes'
+ )
+ continue
+ if target_node is None:
+ logger.debug(
+ f'Skipping edge: target "{edge_data.target_entity_name}" not in extracted nodes'
+ )
+ continue
+
+ if not edge_data.fact.strip():
+ logger.debug('Skipping edge with empty fact')
+ continue
+
+ # Map episode_indices (0-indexed) to episode UUIDs
+ edge_episode_uuids: list[str] = []
+ for idx in edge_data.episode_indices:
+ if 0 <= idx < len(episodes):
+ edge_episode_uuids.append(episodes[idx].uuid)
+ if not edge_episode_uuids:
+ edge_episode_uuids = [ep.uuid for ep in episodes]
+
+ # Use the first attributed episode's timestamp as the reference time
+ edge_reference_time = (
+ episodes[edge_data.episode_indices[0]].valid_at
+ if edge_data.episode_indices and 0 <= edge_data.episode_indices[0] < len(episodes)
+ else primary_episode.valid_at
+ )
+
+ edge = EntityEdge(
+ source_node_uuid=source_node.uuid,
+ target_node_uuid=target_node.uuid,
+ name=edge_data.relation_type,
+ group_id=primary_episode.group_id,
+ fact=edge_data.fact,
+ episodes=edge_episode_uuids,
+ created_at=utc_now(),
+ reference_time=edge_reference_time,
+ )
+ extracted_edges.append(edge)
+
+ # --- Extract timestamps for all edges in a single batch LLM call ---
+ if extracted_edges:
+ facts_with_ref = [
+ {
+ 'fact': edge.fact,
+ 'reference_time': (
+ edge.reference_time.isoformat() if edge.reference_time else 'unknown'
+ ),
+ }
+ for edge in extracted_edges
+ ]
+ try:
+ ts_response = await llm_client.generate_response(
+ prompt_library.extract_edges.extract_timestamps_batch({'facts': facts_with_ref}),
+ response_model=BatchEdgeTimestamps,
+ model_size=ModelSize.small,
+ prompt_name='extract_edges.extract_timestamps_batch',
+ )
+ batch_timestamps = BatchEdgeTimestamps(**ts_response)
+ if len(batch_timestamps.timestamps) != len(extracted_edges):
+ logger.warning(
+ 'Batch timestamp count mismatch: got %d timestamps for %d edges',
+ len(batch_timestamps.timestamps),
+ len(extracted_edges),
+ )
+ for edge, ts in zip(extracted_edges, batch_timestamps.timestamps, strict=False):
+ if ts.valid_at:
+ try:
+ edge.valid_at = ensure_utc(
+ datetime.fromisoformat(ts.valid_at.replace('Z', '+00:00'))
+ )
+ except ValueError:
+ logger.debug(f'Error parsing valid_at: {ts.valid_at}')
+ if ts.invalid_at:
+ try:
+ edge.invalid_at = ensure_utc(
+ datetime.fromisoformat(ts.invalid_at.replace('Z', '+00:00'))
+ )
+ except ValueError:
+ logger.debug(f'Error parsing invalid_at: {ts.invalid_at}')
+ except Exception:
+ logger.warning(
+ 'Failed to extract batch timestamps for %d edges',
+ len(extracted_edges),
+ exc_info=True,
+ )
+
+ # --- Derive node episode attribution from edges and drop orphans ---
+ # Each node inherits the episode indices of every edge it participates in.
+ # Nodes with no connecting edges are dropped — they have no retrievable facts.
+ episode_uuid_to_idx = {ep.uuid: i for i, ep in enumerate(episodes)}
+ connected_node_uuids: set[str] = set()
+ for edge in extracted_edges:
+ connected_node_uuids.add(edge.source_node_uuid)
+ connected_node_uuids.add(edge.target_node_uuid)
+
+ orphan_count = sum(1 for n in extracted_nodes if n.uuid not in connected_node_uuids)
+ if orphan_count:
+ logger.debug(
+ 'Dropping %d orphan node(s) with no connecting edges',
+ orphan_count,
+ )
+ extracted_nodes = [n for n in extracted_nodes if n.uuid in connected_node_uuids]
+
+ for edge in extracted_edges:
+ for node_uuid in (edge.source_node_uuid, edge.target_node_uuid):
+ edge_episode_positions = [
+ episode_uuid_to_idx[ep_uuid]
+ for ep_uuid in edge.episodes
+ if ep_uuid in episode_uuid_to_idx
+ ]
+ existing = node_episode_index_map.get(node_uuid, [])
+ merged = sorted(set(existing + edge_episode_positions))
+ node_episode_index_map[node_uuid] = merged
+
+ logger.debug(
+ f'Combined extraction final: {len(extracted_nodes)} nodes, '
+ f'{len(extracted_edges)} edges (from {len(response_object.edges)} raw)'
+ )
+
+ return extracted_nodes, extracted_edges, node_episode_index_map
diff --git a/graphiti_core/utils/maintenance/edge_operations.py b/graphiti_core/utils/maintenance/edge_operations.py
index 85f20d2db..45c05da43 100644
--- a/graphiti_core/utils/maintenance/edge_operations.py
+++ b/graphiti_core/utils/maintenance/edge_operations.py
@@ -36,7 +36,7 @@
from graphiti_core.prompts import prompt_library
from graphiti_core.prompts.dedupe_edges import EdgeDuplicate
from graphiti_core.prompts.extract_edges import Edge as ExtractedEdge
-from graphiti_core.prompts.extract_edges import ExtractedEdges
+from graphiti_core.prompts.extract_edges import EdgeTimestamps, ExtractedEdges
from graphiti_core.search.search import search
from graphiti_core.search.search_config import SearchResults
from graphiti_core.search.search_config_recipes import EDGE_HYBRID_SEARCH_RRF
@@ -171,12 +171,12 @@ async def extract_edges(
if len(episodes) > 1:
episode_attribution = (
'\n8. **Episode Attribution**: The CURRENT_MESSAGE contains multiple episodes labeled '
- '[Episode 1], [Episode 2], etc. Each episode header includes a timestamp indicating '
+ '[Episode 0], [Episode 1], etc. Each episode header includes a timestamp indicating '
'when that episode occurred. Use the per-episode timestamp to resolve relative time '
'mentions within each episode rather than relying solely on REFERENCE_TIME. '
'For each extracted fact, set `episode_indices` '
- 'to the list of episode numbers that the fact was derived from. '
- 'A fact sourced from Episodes 1 and 2 should have `episode_indices: [1, 2]`.'
+ 'to the 0-based list of episode numbers that the fact was derived from. '
+ 'A fact sourced from Episodes 0 and 1 should have `episode_indices: [0, 1]`.'
)
# Prepare context for LLM
@@ -286,12 +286,12 @@ async def extract_edges(
except ValueError as e:
logger.warning(f'WARNING: Error parsing invalid_at date: {e}. Input: {invalid_at}')
- # Map episode_indices (1-indexed) to episode UUIDs.
+ # Map episode_indices (0-indexed) to episode UUIDs.
# Clamp indices to valid range and fall back to all episodes if empty.
edge_episode_uuids = []
for idx in edge_data.episode_indices:
- if 1 <= idx <= len(episodes):
- edge_episode_uuids.append(episodes[idx - 1].uuid)
+ if 0 <= idx < len(episodes):
+ edge_episode_uuids.append(episodes[idx].uuid)
if not edge_episode_uuids:
edge_episode_uuids = [ep.uuid for ep in episodes]
@@ -306,8 +306,8 @@ async def extract_edges(
valid_at=valid_at_datetime,
invalid_at=invalid_at_datetime,
reference_time=(
- episodes[edge_data.episode_indices[0] - 1].valid_at
- if edge_data.episode_indices and 1 <= edge_data.episode_indices[0] <= len(episodes)
+ episodes[edge_data.episode_indices[0]].valid_at
+ if edge_data.episode_indices and 0 <= edge_data.episode_indices[0] < len(episodes)
else primary_episode.valid_at
),
)
@@ -572,6 +572,53 @@ def resolve_edge_contradictions(
return invalidated_edges
+async def _extract_edge_timestamps(
+ llm_client: LLMClient,
+ edge: EntityEdge,
+ episode: EpisodicNode | None,
+) -> None:
+ """Extract valid_at / invalid_at timestamps for an edge via a lightweight LLM call.
+
+ Modifies the edge in place. Skips if the edge already has timestamps set
+ (e.g., from the extraction prompt in the separate-extraction path) or if
+ no reference time is available.
+ """
+ if edge.valid_at is not None or edge.invalid_at is not None:
+ return
+
+ if episode is None or episode.valid_at is None:
+ return
+
+ context = {
+ 'fact': edge.fact,
+ 'reference_time': episode.valid_at.isoformat(),
+ }
+ try:
+ llm_response = await llm_client.generate_response(
+ prompt_library.extract_edges.extract_timestamps(context),
+ response_model=EdgeTimestamps,
+ model_size=ModelSize.small,
+ prompt_name='extract_edges.extract_timestamps',
+ )
+ timestamps = EdgeTimestamps(**llm_response)
+ if timestamps.valid_at:
+ try:
+ edge.valid_at = ensure_utc(
+ datetime.fromisoformat(timestamps.valid_at.replace('Z', '+00:00'))
+ )
+ except ValueError:
+ logger.debug(f'Error parsing valid_at: {timestamps.valid_at}')
+ if timestamps.invalid_at:
+ try:
+ edge.invalid_at = ensure_utc(
+ datetime.fromisoformat(timestamps.invalid_at.replace('Z', '+00:00'))
+ )
+ except ValueError:
+ logger.debug(f'Error parsing invalid_at: {timestamps.invalid_at}')
+ except Exception:
+ logger.warning('Failed to extract timestamps for edge %s', edge.uuid, exc_info=True)
+
+
async def resolve_extracted_edge(
llm_client: LLMClient,
extracted_edge: EntityEdge,
@@ -603,7 +650,7 @@ async def resolve_extracted_edge(
The resolved edge, any duplicates, and edges to invalidate.
"""
if len(related_edges) == 0 and len(existing_edges) == 0:
- # Still extract custom attributes even when no dedup/invalidation is needed
+ # Still extract custom attributes and timestamps even when no dedup needed
edge_model = edge_type_candidates.get(extracted_edge.name) if edge_type_candidates else None
if edge_model is not None and len(edge_model.model_fields) != 0:
edge_attributes_context = {
@@ -619,6 +666,8 @@ async def resolve_extracted_edge(
)
extracted_edge.attributes = edge_attributes_response
+ await _extract_edge_timestamps(llm_client, extracted_edge, episode)
+
return extracted_edge, [], []
# Fast path: if the fact text and endpoints already exist verbatim, reuse the matching edge.
@@ -736,6 +785,10 @@ async def resolve_extracted_edge(
else:
resolved_edge.attributes = {}
+ # Extract timestamps for new edges (duplicated edges retain their existing timestamps)
+ if resolved_edge.uuid == extracted_edge.uuid:
+ await _extract_edge_timestamps(llm_client, resolved_edge, episode)
+
end = time()
logger.debug(
f'Resolved Edge: {extracted_edge.uuid} -> {resolved_edge.uuid}, in {(end - start) * 1000} ms'
diff --git a/graphiti_core/utils/maintenance/node_operations.py b/graphiti_core/utils/maintenance/node_operations.py
index c2eb4cca3..c661091c5 100644
--- a/graphiti_core/utils/maintenance/node_operations.py
+++ b/graphiti_core/utils/maintenance/node_operations.py
@@ -104,10 +104,10 @@ async def extract_nodes(
if len(episodes) > 1:
episode_attribution = (
'\n7. **Episode Attribution**: The content contains multiple episodes labeled '
- '[Episode 1], [Episode 2], etc. Each episode header includes a timestamp indicating '
+ '[Episode 0], [Episode 1], etc. Each episode header includes a timestamp indicating '
'when that episode occurred. For each extracted entity, set `episode_indices` '
- 'to the list of episode numbers where that entity is mentioned. '
- 'An entity appearing in Episodes 1 and 3 should have `episode_indices: [1, 3]`.'
+ 'to the 0-based list of episode numbers where that entity is mentioned. '
+ 'An entity appearing in Episodes 0 and 2 should have `episode_indices: [0, 2]`.'
)
# Build base context
@@ -320,9 +320,9 @@ def _create_entity_nodes(
)
extracted_nodes.append(new_node)
- # Map node to 0-indexed episode positions (LLM returns 1-indexed).
+ # Map node to 0-indexed episode positions (LLM returns 0-indexed).
# Clamp to valid range; fall back to all episodes if empty.
- indices = [i - 1 for i in extracted_entity.episode_indices if 1 <= i <= len(episodes)]
+ indices = [i for i in extracted_entity.episode_indices if 0 <= i < len(episodes)]
if not indices:
indices = list(range(len(episodes)))
node_episode_index_map[new_node.uuid] = indices
diff --git a/graphiti_core/utils/text_utils.py b/graphiti_core/utils/text_utils.py
index fce2eb1c3..1dc19b7ea 100644
--- a/graphiti_core/utils/text_utils.py
+++ b/graphiti_core/utils/text_utils.py
@@ -69,7 +69,7 @@ def concatenate_episodes(episodes: list[EpisodicNode]) -> str:
if len(episodes) == 1:
return episodes[0].content
parts: list[str] = []
- for i, ep in enumerate(episodes, start=1):
+ for i, ep in enumerate(episodes):
timestamp = ep.valid_at.isoformat() if ep.valid_at else 'unknown'
parts.append(f'[Episode {i}] (timestamp: {timestamp})\n{ep.content}')
return '\n\n'.join(parts)
diff --git a/tests/utils/test_concatenate_episodes.py b/tests/utils/test_concatenate_episodes.py
index a89d5a789..78c78ec83 100644
--- a/tests/utils/test_concatenate_episodes.py
+++ b/tests/utils/test_concatenate_episodes.py
@@ -25,9 +25,9 @@ def test_multiple_episodes_adds_headers_with_timestamps(self):
_make_episode('Third', datetime(2025, 1, 1, 10, 10, 0, tzinfo=timezone.utc)),
]
result = concatenate_episodes(eps)
- assert '[Episode 1] (timestamp: 2025-01-01T10:00:00+00:00)\nFirst' in result
- assert '[Episode 2] (timestamp: 2025-01-01T10:05:00+00:00)\nSecond' in result
- assert '[Episode 3] (timestamp: 2025-01-01T10:10:00+00:00)\nThird' in result
+ assert '[Episode 0] (timestamp: 2025-01-01T10:00:00+00:00)\nFirst' in result
+ assert '[Episode 1] (timestamp: 2025-01-01T10:05:00+00:00)\nSecond' in result
+ assert '[Episode 2] (timestamp: 2025-01-01T10:10:00+00:00)\nThird' in result
def test_multiple_episodes_separated_by_blank_line(self):
ts = datetime(2025, 6, 15, 8, 0, 0, tzinfo=timezone.utc)
@@ -35,7 +35,7 @@ def test_multiple_episodes_separated_by_blank_line(self):
result = concatenate_episodes(eps)
ts_str = ts.isoformat()
assert result == (
- f'[Episode 1] (timestamp: {ts_str})\nA\n\n[Episode 2] (timestamp: {ts_str})\nB'
+ f'[Episode 0] (timestamp: {ts_str})\nA\n\n[Episode 1] (timestamp: {ts_str})\nB'
)
def test_single_episode_no_header(self):