Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,51 @@ if data.get('citations'):
<td><code>sentence-transformers/all-mpnet-base-v2</code></td>
<td>Embedding model</td>
</tr>
<tr>
<td><code>RERANK_ENABLED</code></td>
<td><code>true</code></td>
<td>Enable lightweight post-retrieval reranking</td>
</tr>
<tr>
<td><code>RERANK_CANDIDATE_MULTIPLIER</code></td>
<td><code>3</code></td>
<td>Multiplier for initial candidate pool before reranking</td>
</tr>
<tr>
<td><code>RERANK_SIMILARITY_WEIGHT</code></td>
<td><code>0.7</code></td>
<td>Weight for vector similarity score in final rerank score</td>
</tr>
<tr>
<td><code>RERANK_KEYWORD_WEIGHT</code></td>
<td><code>0.2</code></td>
<td>Weight for query term overlap with chunk content</td>
</tr>
<tr>
<td><code>RERANK_METADATA_WEIGHT</code></td>
<td><code>0.1</code></td>
<td>Weight for query term overlap with file path and citation URL</td>
</tr>
<tr>
<td><code>RERANK_MAX_CANDIDATES</code></td>
<td><code>50</code></td>
<td>Upper bound for candidate pool size fetched before reranking</td>
</tr>
<tr>
<td><code>RERANK_MIN_TOKEN_LEN</code></td>
<td><code>3</code></td>
<td>Minimum token length used for query/content term overlap scoring</td>
</tr>
<tr>
<td><code>RERANK_DEBUG_LOG</code></td>
<td><code>false</code></td>
<td>Enable before/after reranking logs with component scores</td>
</tr>
<tr>
<td><code>RERANK_LOG_TOP_N</code></td>
<td><code>5</code></td>
<td>Number of documents to show in reranking debug logs</td>
</tr>
</tbody>
</table>

Expand Down
140 changes: 140 additions & 0 deletions eval_retrieval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import argparse
import os
from typing import Any, Dict, List

from pymilvus import Collection, connections
from sentence_transformers import SentenceTransformer

from shared.reranking import candidate_pool_limit, load_rerank_config_from_env, rerank_documents


DEFAULT_QUERIES = [
"How do I create a Kubeflow Pipeline?",
"How to deploy an InferenceService in KServe?",
"Kubeflow Notebook setup requirements",
]


def build_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Evaluate retrieval results before/after reranking.")
parser.add_argument("--queries", nargs="*", default=DEFAULT_QUERIES, help="Queries to evaluate")
parser.add_argument("--top-k", type=int, default=5, help="Final top-k results to keep")
parser.add_argument(
"--show-content-chars",
type=int,
default=180,
help="Number of content characters to print per result",
)
return parser.parse_args()


def _print_docs(title: str, docs: List[Dict[str, Any]], show_content_chars: int) -> None:
print(f"\n{title}")
if not docs:
print(" (no results)")
return

for idx, doc in enumerate(docs, start=1):
content = (doc.get("content_text") or "").replace("\n", " ").strip()
if len(content) > show_content_chars:
content = content[:show_content_chars] + "..."

print(
f" {idx}. score={doc.get('rerank_score', doc.get('similarity', 0.0)):.4f} "
f"sim={doc.get('similarity', 0.0):.4f} "
f"keyword={doc.get('keyword_score', 0.0):.4f} "
f"metadata={doc.get('metadata_score', 0.0):.4f}"
)
print(f" file={doc.get('file_path', '')}")
print(f" url={doc.get('citation_url', '')}")
print(f" text={content}")


def retrieve_candidates(
query: str,
model: SentenceTransformer,
collection: Collection,
top_k: int,
candidate_limit: int,
vector_field: str,
) -> List[Dict[str, Any]]:
query_vec = model.encode(query).tolist()
search_params = {"metric_type": "COSINE", "params": {"nprobe": 32}}

results = collection.search(
data=[query_vec],
anns_field=vector_field,
param=search_params,
limit=candidate_limit,
output_fields=["file_path", "content_text", "citation_url"],
)

docs: List[Dict[str, Any]] = []
for hit in results[0]:
entity = hit.entity
docs.append(
{
"similarity": 1.0 - float(hit.distance),
"file_path": entity.get("file_path"),
"citation_url": entity.get("citation_url"),
"content_text": entity.get("content_text") or "",
}
)

return docs


def main() -> None:
args = build_args()

milvus_host = os.getenv("MILVUS_HOST", "my-release-milvus.docs-agent.svc.cluster.local")
milvus_port = os.getenv("MILVUS_PORT", "19530")
milvus_collection = os.getenv("MILVUS_COLLECTION", "docs_rag")
milvus_vector_field = os.getenv("MILVUS_VECTOR_FIELD", "vector")
embedding_model_name = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-mpnet-base-v2")

rerank_config = load_rerank_config_from_env()
requested_top_k = max(1, int(args.top_k))
candidate_limit = candidate_pool_limit(requested_top_k, rerank_config)

print("Retrieval evaluation configuration")
print(f"- collection: {milvus_collection}")
print(f"- top_k: {requested_top_k}")
print(f"- candidate_limit: {candidate_limit}")
print(f"- rerank_enabled: {rerank_config.enabled}")

connections.connect(alias="default", host=milvus_host, port=milvus_port)
try:
collection = Collection(milvus_collection)
collection.load()
model = SentenceTransformer(embedding_model_name)

for query in args.queries:
print("\n" + "=" * 100)
print(f"Query: {query}")

candidates = retrieve_candidates(
query=query,
model=model,
collection=collection,
top_k=requested_top_k,
candidate_limit=candidate_limit,
vector_field=milvus_vector_field,
)

before_docs = candidates[:requested_top_k]
after_docs = rerank_documents(
query=query,
docs=candidates,
config=rerank_config,
top_k=requested_top_k,
)

_print_docs("Before reranking", before_docs, args.show_content_chars)
_print_docs("After reranking", after_docs, args.show_content_chars)
finally:
connections.disconnect(alias="default")


if __name__ == "__main__":
main()
Binary file added git_logs_temp.txt
Binary file not shown.
12 changes: 12 additions & 0 deletions git_msg.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
bd1725fdd3d161db68f327df783abe7f8d464cf7
Ayush-kathil
Add thread-safe model init and Milvus search
Introduce thread-safe lazy initialization for SentenceTransformer in server-https/app.py and server/app.py using a lock and double-checked locking, with timing/info logs to avoid repeated model loading. Add threading and time imports. In server/app.py add milvus_search(query, top_k) to perform Milvus connection, load collection, encode query with the cached model, execute search, format results (similarity, file_path, citation_url, truncated content_text), handle errors, and ensure disconnect. These changes reduce initialization overhead and centralize Milvus search logic.

---
f05614a123776a4009d7104125931289d23bcdcc
Ayush Gupta
fix(server-https): preserve multi-hop citations in stream_llm_response
Signed-off-by: Ayush-kathil <kathilshiva@gmail.com>

---
107 changes: 89 additions & 18 deletions kagent-feast-mcp/mcp-server/server.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,67 @@
from fastmcp import FastMCP
import logging
import os
import sys
import threading
from pathlib import Path
from typing import Any, Dict, List
from pymilvus import MilvusClient
from sentence_transformers import SentenceTransformer

MILVUS_URI = "http://milvus.<YOUR_NAMESPACE>.svc.cluster.local:19530"
MILVUS_USER = "root"
MILVUS_PASSWORD = "Milvus"
COLLECTION_NAME = "kubeflow_docs_docs_rag"
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
PORT = 8000
REPO_ROOT = Path(__file__).resolve().parents[2]
if str(REPO_ROOT) not in sys.path:
sys.path.append(str(REPO_ROOT))

from shared.reranking import candidate_pool_limit, load_rerank_config_from_env, rerank_documents

MILVUS_URI = os.getenv("MILVUS_URI", "http://milvus.<YOUR_NAMESPACE>.svc.cluster.local:19530")
MILVUS_USER = os.getenv("MILVUS_USER", "root")
MILVUS_PASSWORD = os.getenv("MILVUS_PASSWORD", "Milvus")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "kubeflow_docs_docs_rag")
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-mpnet-base-v2")
PORT = int(os.getenv("PORT", "8000"))
RERANK_CONFIG = load_rerank_config_from_env()

mcp = FastMCP("Kubeflow Docs MCP Server")
logger = logging.getLogger(__name__)
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper())

model: SentenceTransformer = None
client: MilvusClient = None
_initialized = False
_init_lock = threading.Lock()


def _init():
global model, client
if model is None:
model = SentenceTransformer(EMBEDDING_MODEL)
if client is None:
client = MilvusClient(uri=MILVUS_URI, user=MILVUS_USER, password=MILVUS_PASSWORD)
"""Initialize shared model/client exactly once.

Synchronization strategy:
- Fast path: return immediately after successful initialization.
- Slow path: take a process-local lock and re-check state (double-checked locking).

This guarantees that concurrent callers block until initialization completes,
and all callers observe the same initialized instances.
"""
global model, client, _initialized

if _initialized:
return

with _init_lock:
if _initialized:
return

logger.info("Initializing shared MCP resources")

# Build local instances first, then publish atomically under the lock.
local_model = SentenceTransformer(EMBEDDING_MODEL)
local_client = MilvusClient(uri=MILVUS_URI, user=MILVUS_USER, password=MILVUS_PASSWORD)

model = local_model
client = local_client
_initialized = True

logger.info("Shared MCP resources initialized")


@mcp.tool()
Expand All @@ -37,24 +78,54 @@ def search_kubeflow_docs(query: str, top_k: int = 5) -> str:
_init()

embedding = model.encode(query).tolist()
requested_top_k = max(1, int(top_k))
candidate_limit = candidate_pool_limit(requested_top_k, RERANK_CONFIG)

hits = client.search(
collection_name=COLLECTION_NAME,
data=[embedding],
limit=top_k,
limit=candidate_limit,
output_fields=["content_text", "citation_url", "file_path"],
)[0]

if not hits:
return "No results found for your query."

docs: List[Dict[str, Any]] = []
for hit in hits:
entity = hit.get("entity", {})
content_text = entity.get("content_text") or ""
if isinstance(content_text, str) and len(content_text) > 400:
content_text = content_text[:400] + "..."

docs.append(
{
"distance": float(hit.get("distance", 0.0)),
"similarity": 1.0 - float(hit.get("distance", 0.0)),
"file_path": entity.get("file_path"),
"citation_url": entity.get("citation_url"),
"content_text": content_text,
}
)

selected_docs = rerank_documents(
query=query,
docs=docs,
config=RERANK_CONFIG,
top_k=requested_top_k,
logger=logger,
log_prefix="mcp_search",
)

results = []
for i, hit in enumerate(hits, 1):
entity = hit["entity"]
entry = f"### Result {i} (score: {hit['distance']:.4f})"
entry += f"\n**Source:** {entity.get('citation_url', '')}"
entry += f"\n**File:** {entity.get('file_path', '')}"
entry += f"\n\n{entity.get('content_text', '')}\n"
for i, doc in enumerate(selected_docs, 1):
entry = f"### Result {i} (rerank_score: {doc.get('rerank_score', 0.0):.4f})"
entry += f"\n**Similarity:** {doc.get('similarity', 0.0):.4f}"
entry += f"\n**Keyword Score:** {doc.get('keyword_score', 0.0):.4f}"
entry += f"\n**Metadata Score:** {doc.get('metadata_score', 0.0):.4f}"
entry += f"\n**Source:** {doc.get('citation_url', '')}"
entry += f"\n**File:** {doc.get('file_path', '')}"
entry += f"\n\n{doc.get('content_text', '')}\n"
results.append(entry)

return "\n---\n".join(results)
Expand Down
Loading