diff --git a/agent/__init__.py b/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent/core/__init__.py b/agent/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent/core/graph.py b/agent/core/graph.py new file mode 100644 index 0000000..1db653e --- /dev/null +++ b/agent/core/graph.py @@ -0,0 +1,159 @@ +from typing import TypedDict, Annotated, Literal +from langgraph.graph import StateGraph, END +from pymilvus import connections, Collection +from sentence_transformers import SentenceTransformer +import operator +import json +import datetime + +# ── Connect to Milvus ──────────────────────────────────── +connections.connect("default", host="localhost", port="19530") +embed_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") + +# ── State definition ───────────────────────────────────── +class AgentState(TypedDict): + question: str + route: str + chunks: list + citations: list + answer: str + messages: Annotated[list, operator.add] + +# ── Helper: search Milvus ──────────────────────────────── +def search_index(collection_name: str, question: str, top_k: int = 3) -> list: + vector = embed_model.encode(question).tolist() + collection = Collection(collection_name) + collection.load() + results = collection.search( + data=[vector], + anns_field="vector", + param={"metric_type": "COSINE", "params": {"nprobe": 10}}, + limit=top_k, + output_fields=["content_text", "source_url", "h1", "h2"] + ) + chunks = [] + for hit in results[0]: + chunks.append({ + "text": hit.entity.get("content_text", ""), + "source_url": hit.entity.get("source_url", ""), + "h1": hit.entity.get("h1", ""), + "h2": hit.entity.get("h2", ""), + "score": round(hit.score, 4) + }) + return chunks + +# ── Node 1: Router ─────────────────────────────────────── +def router(state: AgentState) -> dict: + q = state["question"].lower() + + code_keywords = [ + "yaml", "manifest", "crd", "deployment", "service", + "bug", "error", "crash", "fix", "issue", "exception", + "code", "function", "class", "api", "webhook", "config" + ] + doc_keywords = [ + "what is", "how does", "explain", "overview", "concept", + "architecture", "introduction", "guide", "tutorial" + ] + + code_score = sum(1 for w in code_keywords if w in q) + doc_score = sum(1 for w in doc_keywords if w in q) + + if code_score > doc_score: + route = "code" + elif doc_score > code_score: + route = "docs" + else: + route = "both" + + # Emit routing log — future training data + log = { + "timestamp": datetime.datetime.utcnow().isoformat(), + "question": state["question"], + "route": route, + "doc_score": doc_score, + "code_score": code_score + } + with open("routing_logs.jsonl", "a") as f: + f.write(json.dumps(log) + "\n") + + print(f"[Router] route={route} doc_score={doc_score} code_score={code_score}") + return {"route": route} + +# ── Routing function ───────────────────────────────────── +def decide_tool(state: AgentState) -> Literal["query_docs", "query_code", "query_both"]: + return { + "docs": "query_docs", + "code": "query_code", + "both": "query_both" + }[state["route"]] + +# ── Node 2a: Query docs index ──────────────────────────── +def query_docs(state: AgentState) -> dict: + print("[Tool] searching docs_index...") + chunks = search_index("docs_index", state["question"]) + citations = list(set(c["source_url"] for c in chunks if c["source_url"])) + return {"chunks": chunks, "citations": citations} + +# ── Node 2b: Query code index ──────────────────────────── +def query_code(state: AgentState) -> dict: + print("[Tool] searching code_index...") + chunks = search_index("code_index", state["question"]) + citations = list(set(c["source_url"] for c in chunks if c["source_url"])) + return {"chunks": chunks, "citations": citations} + +# ── Node 2c: Query both indexes ────────────────────────── +def query_both(state: AgentState) -> dict: + print("[Tool] searching both indexes...") + doc_chunks = search_index("docs_index", state["question"], top_k=2) + code_chunks = search_index("code_index", state["question"], top_k=2) + chunks = doc_chunks + code_chunks + citations = list(set(c["source_url"] for c in chunks if c["source_url"])) + return {"chunks": chunks, "citations": citations} + +# ── Node 3: Synthesize ─────────────────────────────────── +def synthesize(state: AgentState) -> dict: + context = "\n\n".join([ + f"Source: {c['source_url']}\nSection: {c['h1']} > {c['h2']}\n{c['text']}" + for c in state["chunks"] + ]) + + # Stub — Phase 2 Part 3 replaces this with real LLM call + answer = ( + f"Based on {len(state['chunks'])} retrieved chunks " + f"from the {state['route']} index:\n\n" + f"{context[:800]}\n\n" + f"Citations: {', '.join(state['citations'])}" + ) + return {"answer": answer} + +# ── Build the graph ────────────────────────────────────── +def build_agent(): + builder = StateGraph(AgentState) + + builder.add_node("router", router) + builder.add_node("query_docs", query_docs) + builder.add_node("query_code", query_code) + builder.add_node("query_both", query_both) + builder.add_node("synthesize", synthesize) + + builder.set_entry_point("router") + + builder.add_conditional_edges( + "router", + decide_tool, + { + "query_docs": "query_docs", + "query_code": "query_code", + "query_both": "query_both", + } + ) + + builder.add_edge("query_docs", "synthesize") + builder.add_edge("query_code", "synthesize") + builder.add_edge("query_both", "synthesize") + builder.add_edge("synthesize", END) + + return builder.compile() + +agent = build_agent() \ No newline at end of file diff --git a/agent/kserve/agent-deployment.yaml b/agent/kserve/agent-deployment.yaml new file mode 100644 index 0000000..1763fb0 --- /dev/null +++ b/agent/kserve/agent-deployment.yaml @@ -0,0 +1,51 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: docs-agent-server + namespace: docs-agent +spec: + replicas: 1 + selector: + matchLabels: + app: docs-agent-server + template: + metadata: + labels: + app: docs-agent-server + spec: + containers: + - name: server + image: thadev14/docs-agent:latest + ports: + - containerPort: 8000 + env: + - name: MILVUS_HOST + value: "milvus-standalone.docs-agent.svc.cluster.local" + - name: MILVUS_PORT + value: "19530" + - name: AGENT_BACKEND + value: "langgraph" + readinessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 10 + periodSeconds: 5 + livenessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 30 + periodSeconds: 10 +--- +apiVersion: v1 +kind: Service +metadata: + name: docs-agent-mcp-service + namespace: docs-agent +spec: + selector: + app: docs-agent-server + ports: + - port: 8000 + targetPort: 8000 \ No newline at end of file diff --git a/agent/kserve/embedding-service.yaml b/agent/kserve/embedding-service.yaml new file mode 100644 index 0000000..bf9d9b9 --- /dev/null +++ b/agent/kserve/embedding-service.yaml @@ -0,0 +1,25 @@ +apiVersion: serving.kserve.io/v1beta1 +kind: InferenceService +metadata: + name: embedding-service + namespace: docs-agent + annotations: + autoscaling.knative.dev/scaleToZero: "false" +spec: + predictor: + minReplicas: 1 + maxReplicas: 2 + model: + modelFormat: + name: huggingface + runtime: llm-runtime + args: + - --model_id=sentence-transformers/all-mpnet-base-v2 + - --backend=huggingface + resources: + requests: + cpu: "2" + memory: "4Gi" + limits: + cpu: "4" + memory: "8Gi" \ No newline at end of file diff --git a/agent/kserve/kagent-crd.yaml b/agent/kserve/kagent-crd.yaml new file mode 100644 index 0000000..506b87e --- /dev/null +++ b/agent/kserve/kagent-crd.yaml @@ -0,0 +1,37 @@ +apiVersion: kagent.dev/v1alpha1 +kind: Agent +metadata: + name: kubeflow-docs-agent + namespace: docs-agent +spec: + description: > + Agentic RAG assistant for Kubeflow documentation and code. + Routes queries to docs_index or code_index based on intent. + Implements Thin Context MCP pattern per gsoc2026_agentic_rag.md. + systemPrompt: | + You are the Kubeflow Documentation Assistant. + You have access to two knowledge sources: + - Kubeflow official documentation (conceptual questions) + - Kubeflow manifests codebase (YAML configs, debugging) + Always return cited answers with source URLs. + Keep responses concise and technically accurate. + modelConfig: + apiKeySecretRef: + name: llm-secret + key: apiKey + tools: + - name: query_docs + description: Search Kubeflow documentation index + type: McpServer + mcpServer: + url: http://docs-agent-mcp-service:8000/mcp/query_docs + - name: query_code + description: Search Kubeflow manifests code index + type: McpServer + mcpServer: + url: http://docs-agent-mcp-service:8000/mcp/query_code + - name: query_both + description: Search both docs and code indexes + type: McpServer + mcpServer: + url: http://docs-agent-mcp-service:8000/mcp/query_both \ No newline at end of file diff --git a/agent/kserve/llm-service.yaml b/agent/kserve/llm-service.yaml new file mode 100644 index 0000000..ce6626f --- /dev/null +++ b/agent/kserve/llm-service.yaml @@ -0,0 +1,40 @@ +apiVersion: serving.kserve.io/v1beta1 +kind: InferenceService +metadata: + name: llama-service + namespace: docs-agent + annotations: + autoscaling.knative.dev/scaleToZero: "true" + autoscaling.knative.dev/scaleToZeroPodRetentionPeriod: "5m" + autoscaling.knative.dev/initialScale: "1" + autoscaling.knative.dev/scaleUpDelay: "0s" +spec: + predictor: + minReplicas: 0 + maxReplicas: 1 + model: + modelFormat: + name: huggingface + runtime: llm-runtime + args: + - --model_id=RedHatAI/Llama-3.1-8B-Instruct + - --backend=vllm + - --max-model-len=32768 + - --gpu-memory-utilization=0.90 + - --enable-auto-tool-choice + - --tool-call-parser=llama3_json + env: + - name: HF_TOKEN + valueFrom: + secretKeyRef: + name: huggingface-secret + key: token + resources: + requests: + cpu: "4" + memory: "16Gi" + nvidia.com/gpu: "1" + limits: + cpu: "6" + memory: "24Gi" + nvidia.com/gpu: "1" \ No newline at end of file diff --git a/agent/server.py b/agent/server.py new file mode 100644 index 0000000..beddb16 --- /dev/null +++ b/agent/server.py @@ -0,0 +1,126 @@ +import os +import json +import asyncio +from fastapi import FastAPI, Request +from fastapi.responses import StreamingResponse +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +from typing import Optional +import uvicorn + +from agent.core.graph import agent as langgraph_agent +from agent.tools.mcp_tools import query_docs_tool, query_code_tool, query_both_tool + +# ── App setup ───────────────────────────────────────────── +app = FastAPI( + title="Kubeflow Docs Agent", + description="Agentic RAG for Kubeflow documentation and code", + version="1.0.0" +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +# ── Request model ───────────────────────────────────────── +class ChatRequest(BaseModel): + message: str + stream: Optional[bool] = True + backend: Optional[str] = "langgraph" # langgraph or mcp + +# ── Health check ────────────────────────────────────────── +@app.get("/health") +async def health(): + return {"status": "ok", "version": "1.0.0"} + +# ── MCP tool endpoints (callable from any IDE) ──────────── +@app.post("/mcp/query_docs") +async def mcp_query_docs(request: Request): + body = await request.json() + question = body.get("question", "") + top_k = body.get("top_k", 3) + result = query_docs_tool(question, top_k) + return result + +@app.post("/mcp/query_code") +async def mcp_query_code(request: Request): + body = await request.json() + question = body.get("question", "") + top_k = body.get("top_k", 3) + result = query_code_tool(question, top_k) + return result + +@app.post("/mcp/query_both") +async def mcp_query_both(request: Request): + body = await request.json() + question = body.get("question", "") + top_k = body.get("top_k", 2) + result = query_both_tool(question, top_k) + return result + +# ── Main chat endpoint ──────────────────────────────────── +@app.post("/chat") +async def chat(request: ChatRequest): + if request.stream: + return StreamingResponse( + stream_response(request.message), + media_type="text/event-stream" + ) + else: + result = langgraph_agent.invoke({ + "question": request.message, + "route": "", + "chunks": [], + "citations": [], + "answer": "", + "messages": [] + }) + return { + "answer": result["answer"], + "route": result["route"], + "citations": result["citations"], + "chunks": len(result["chunks"]) + } + +# ── SSE streaming response ──────────────────────────────── +async def stream_response(question: str): + # Send thinking update + yield f"data: {json.dumps({'type': 'status', 'content': 'Analyzing question...'})}\n\n" + await asyncio.sleep(0.1) + + # Run agent + result = langgraph_agent.invoke({ + "question": question, + "route": "", + "chunks": [], + "citations": [], + "answer": "", + "messages": [] + }) + + # Send route decision + route_msg = f"Searching {result['route']} index..." + yield f"data: {json.dumps({'type': 'status', 'content': route_msg})}\n\n" + await asyncio.sleep(0.1) + + # Stream answer word by word + words = result["answer"].split() + for i, word in enumerate(words): + chunk = word + " " + yield f"data: {json.dumps({'type': 'content', 'content': chunk})}\n\n" + if i % 10 == 0: + await asyncio.sleep(0.01) + + # Send citations + yield f"data: {json.dumps({'type': 'citations', 'citations': result['citations']})}\n\n" + + # Send done + yield f"data: {json.dumps({'type': 'done'})}\n\n" + + +if __name__ == "__main__": + port = int(os.getenv("PORT", 8000)) + uvicorn.run(app, host="0.0.0.0", port=port) \ No newline at end of file diff --git a/agent/tools/__init__.py b/agent/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent/tools/mcp_tools.py b/agent/tools/mcp_tools.py new file mode 100644 index 0000000..63163ba --- /dev/null +++ b/agent/tools/mcp_tools.py @@ -0,0 +1,149 @@ +from pymilvus import connections, Collection +from sentence_transformers import SentenceTransformer +import json + +# ── Shared setup ───────────────────────────────────────── +connections.connect("default", host="localhost", port="19530") +embed_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") + +# ── Core search function ────────────────────────────────── +def search_index(collection_name: str, question: str, top_k: int = 3) -> list: + vector = embed_model.encode(question).tolist() + collection = Collection(collection_name) + collection.load() + results = collection.search( + data=[vector], + anns_field="vector", + param={"metric_type": "COSINE", "params": {"nprobe": 10}}, + limit=top_k, + output_fields=["content_text", "source_url", "h1", "h2"] + ) + chunks = [] + for hit in results[0]: + chunks.append({ + "text": hit.entity.get("content_text", ""), + "source_url": hit.entity.get("source_url", ""), + "h1": hit.entity.get("h1", ""), + "h2": hit.entity.get("h2", ""), + "score": round(hit.score, 4) + }) + return chunks + + +# ── MCP Tool 1: query_docs ─────────────────────────────── +def query_docs_tool(question: str, top_k: int = 3) -> dict: + """ + MCP tool: search Kubeflow documentation index. + + Use for: conceptual questions, tutorials, architecture explanations. + Returns: golden snippet (150 tokens) + validation URL per result. + + This implements the Thin Context pattern from gsoc2026_agentic_rag.md: + - Returns minimal context to preserve caller's token budget + - Includes direct source URL for caller to fetch full context locally + """ + chunks = search_index("docs_index", question, top_k) + + results = [] + for chunk in chunks: + # Thin Context: truncate to ~150 tokens (600 chars) + golden_snippet = chunk["text"][:600] + validation_link = chunk["source_url"] + + results.append({ + "golden_snippet": golden_snippet, + "validation_link": validation_link, + "section": f"{chunk['h1']} > {chunk['h2']}", + "relevance_score": chunk["score"] + }) + + return { + "tool": "query_docs", + "question": question, + "results": results, + "count": len(results) + } + + +# ── MCP Tool 2: query_code ─────────────────────────────── +def query_code_tool(question: str, top_k: int = 3) -> dict: + """ + MCP tool: search Kubeflow manifests code index. + + Use for: YAML configs, Kubernetes resources, debugging, API questions. + Returns: golden snippet (150 tokens) + direct GitHub file URL. + + The validation_link points directly to the file in kubeflow/manifests + so the caller's IDE agent can fetch the full file for broader context. + """ + chunks = search_index("code_index", question, top_k) + + results = [] + for chunk in chunks: + golden_snippet = chunk["text"][:600] + validation_link = chunk["source_url"] + + results.append({ + "golden_snippet": golden_snippet, + "validation_link": validation_link, + "kind": chunk["h1"], + "name": chunk["h2"], + "relevance_score": chunk["score"] + }) + + return { + "tool": "query_code", + "question": question, + "results": results, + "count": len(results) + } + + +# ── MCP Tool 3: query_both ─────────────────────────────── +def query_both_tool(question: str, top_k: int = 2) -> dict: + """ + MCP tool: search both docs and code indexes simultaneously. + + Use for: complex questions needing both conceptual and technical context. + """ + doc_results = query_docs_tool(question, top_k) + code_results = query_code_tool(question, top_k) + + return { + "tool": "query_both", + "question": question, + "docs_results": doc_results["results"], + "code_results": code_results["results"], + "total_count": doc_results["count"] + code_results["count"] + } + + +# ── MCP server ─────────────────────────────────────────── +def create_mcp_server(): + """ + Creates a FastMCP server exposing all three tools + as standard MCP endpoints callable from any IDE. + """ + from mcp.server.fastmcp import FastMCP + + mcp = FastMCP("kubeflow-docs-agent") + + @mcp.tool() + def query_docs(question: str, top_k: int = 3) -> str: + """Search Kubeflow documentation for conceptual questions.""" + result = query_docs_tool(question, top_k) + return json.dumps(result, indent=2) + + @mcp.tool() + def query_code(question: str, top_k: int = 3) -> str: + """Search Kubeflow manifests for YAML configs and code.""" + result = query_code_tool(question, top_k) + return json.dumps(result, indent=2) + + @mcp.tool() + def query_both(question: str, top_k: int = 2) -> str: + """Search both docs and code indexes simultaneously.""" + result = query_both_tool(question, top_k) + return json.dumps(result, indent=2) + + return mcp \ No newline at end of file diff --git a/docker-compose-milvus.yml b/docker-compose-milvus.yml new file mode 100644 index 0000000..c591f06 --- /dev/null +++ b/docker-compose-milvus.yml @@ -0,0 +1,39 @@ +version: '3.5' + +services: + etcd: + container_name: milvus-etcd + image: quay.io/coreos/etcd:v3.5.5 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + + minio: + container_name: milvus-minio + image: minio/minio:RELEASE.2023-03-13T19-46-17Z + environment: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + command: minio server /minio_data + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + standalone: + container_name: milvus-standalone + image: milvusdb/milvus:v2.4.0 + command: ["milvus", "run", "standalone"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + ports: + - "19530:19530" + - "9091:9091" + depends_on: + - etcd + - minio \ No newline at end of file diff --git a/pipelines/code_ingestion.py b/pipelines/code_ingestion.py new file mode 100644 index 0000000..fab4f12 --- /dev/null +++ b/pipelines/code_ingestion.py @@ -0,0 +1,297 @@ +import kfp +from kfp import dsl +from kfp.dsl import * +from typing import * + + +@dsl.component( + base_image="python:3.11", + packages_to_install=["gitpython"] +) +def clone_manifests( + repo_url: str, + branch: str, + cloned_repo: dsl.Output[dsl.Dataset] +): + import git + import json + from pathlib import Path + + print(f"Cloning {repo_url} branch={branch}") + git.Repo.clone_from( + repo_url, + "/tmp/manifests", + branch=branch, + depth=1 + ) + + all_files = [ + str(f) for f in Path("/tmp/manifests").rglob("*") + if f.is_file() + ] + + print(f"Found {len(all_files)} files") + + with open(cloned_repo.path, "w") as f: + json.dump({ + "repo_path": "/tmp/manifests", + "repo_url": repo_url, + "branch": branch, + "files": all_files + }, f) + + +@dsl.component( + base_image="python:3.11", + packages_to_install=["pyyaml"] +) +def parse_yaml_resources( + cloned_repo: dsl.Input[dsl.Dataset], + parsed_yaml: dsl.Output[dsl.Dataset] +): + import yaml + import json + + with open(cloned_repo.path) as f: + meta = json.load(f) + + repo_path = meta["repo_path"] + repo_url = meta["repo_url"] + branch = meta["branch"] + units = [] + + for file_path in meta["files"]: + if not file_path.endswith((".yaml", ".yml")): + continue + try: + with open(file_path, encoding="utf-8") as f: + docs = list(yaml.safe_load_all(f)) + for doc in docs: + if not doc or "kind" not in doc: + continue + rel = file_path.replace(repo_path + "/", "").replace(repo_path + "\\", "") + units.append({ + "text": yaml.dump(doc, default_flow_style=False), + "file_path": rel, + "source_url": f"{repo_url}/blob/{branch}/{rel}", + "kind": doc.get("kind", ""), + "name": doc.get("metadata", {}).get("name", "") if doc.get("metadata") else "", + "content_type": "kubernetes_resource" + }) + except Exception as e: + print(f"Skipping {file_path}: {e}") + + print(f"Parsed {len(units)} YAML resources") + + with open(parsed_yaml.path, "w") as f: + json.dump(units, f) + + +@dsl.component( + base_image="python:3.11", + packages_to_install=[] +) +def parse_python_ast( + cloned_repo: dsl.Input[dsl.Dataset], + parsed_python: dsl.Output[dsl.Dataset] +): + import ast + import json + from pathlib import Path + + with open(cloned_repo.path) as f: + meta = json.load(f) + + repo_path = meta["repo_path"] + repo_url = meta["repo_url"] + branch = meta["branch"] + units = [] + + for file_path in meta["files"]: + if not file_path.endswith(".py"): + continue + try: + source = Path(file_path).read_text(encoding="utf-8") + tree = ast.parse(source) + rel = file_path.replace(repo_path + "/", "").replace(repo_path + "\\", "") + + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + continue + snippet = ast.get_source_segment(source, node) or "" + if len(snippet) < 20: + continue + docstring = ast.get_docstring(node) or "" + units.append({ + "text": snippet, + "file_path": rel, + "source_url": f"{repo_url}/blob/{branch}/{rel}#L{node.lineno}", + "kind": type(node).__name__, + "name": node.name, + "content_type": "python_definition", + "docstring": docstring[:300] + }) + except Exception as e: + print(f"Skipping {file_path}: {e}") + + print(f"Parsed {len(units)} Python definitions") + + with open(parsed_python.path, "w") as f: + json.dump(units, f) + + +@dsl.component( + base_image="pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime", + packages_to_install=["sentence-transformers"] +) +def embed_code( + parsed_yaml: dsl.Input[dsl.Dataset], + parsed_python: dsl.Input[dsl.Dataset], + embedded_code: dsl.Output[dsl.Dataset] +): + import json + import torch + from sentence_transformers import SentenceTransformer + + device = "cuda" if torch.cuda.is_available() else "cpu" + model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2", device=device) + print(f"Model loaded on {device}") + + # Load both parsed sources + units = [] + for input_file in [parsed_yaml.path, parsed_python.path]: + with open(input_file) as f: + units.extend(json.load(f)) + + print(f"Total units to embed: {len(units)}") + + records = [] + for unit in units: + text = unit["text"][:1000] + embedding = model.encode(text).tolist() + records.append({ + "text": unit["text"][:4000], + "file_path": unit["file_path"], + "source_url": unit["source_url"], + "kind": unit.get("kind", ""), + "name": unit.get("name", ""), + "content_type": unit.get("content_type", ""), + "embedding": embedding + }) + + print(f"Embedded {len(records)} units") + + with open(embedded_code.path, "w") as f: + json.dump(records, f) + + +@dsl.component( + base_image="python:3.11", + packages_to_install=["pymilvus"] +) +def store_code_index( + embedded_code: dsl.Input[dsl.Dataset], + milvus_host: str, + milvus_port: str, + collection_name: str +): + import json + from datetime import datetime + from pymilvus import ( + connections, utility, + FieldSchema, CollectionSchema, DataType, Collection + ) + + connections.connect("default", host=milvus_host, port=milvus_port) + + if utility.has_collection(collection_name): + utility.drop_collection(collection_name) + print(f"Dropped existing collection: {collection_name}") + + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=4000), + FieldSchema(name="file_path", dtype=DataType.VARCHAR, max_length=500), + FieldSchema(name="source_url", dtype=DataType.VARCHAR, max_length=500), + FieldSchema(name="kind", dtype=DataType.VARCHAR, max_length=100), + FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=200), + FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50), + FieldSchema(name="last_updated", dtype=DataType.INT64), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), + ] + + schema = CollectionSchema(fields, "Kubeflow manifests code index") + collection = Collection(collection_name, schema) + print(f"Created collection: {collection_name}") + + with open(embedded_code.path) as f: + records = json.load(f) + + timestamp = int(datetime.now().timestamp()) + batch_size = 500 + + for i in range(0, len(records), batch_size): + batch = records[i:i + batch_size] + collection.insert([{ + "text": r["text"], + "file_path": r["file_path"], + "source_url": r["source_url"], + "kind": r["kind"], + "name": r["name"], + "content_type": r["content_type"], + "last_updated": timestamp, + "vector": r["embedding"] + } for r in batch]) + + collection.flush() + + index_params = { + "metric_type": "COSINE", + "index_type": "IVF_FLAT", + "params": {"nlist": min(1024, len(records))} + } + collection.create_index("vector", index_params) + collection.load() + print(f"Stored {collection.num_entities} records in {collection_name}") + + +@dsl.pipeline( + name="kubeflow-code-ingestion", + description="AST-based code ingestion for kubeflow/manifests" +) +def code_ingestion_pipeline( + repo_url: str = "https://github.com/kubeflow/manifests", + branch: str = "master", + milvus_host: str = "milvus-standalone-final.docs-agent.svc.cluster.local", + milvus_port: str = "19530", + collection_name: str = "code_index" +): + clone_task = clone_manifests(repo_url=repo_url, branch=branch) + + yaml_task = parse_yaml_resources( + cloned_repo=clone_task.outputs["cloned_repo"] + ) + + python_task = parse_python_ast( + cloned_repo=clone_task.outputs["cloned_repo"] + ) + + embed_task = embed_code( + parsed_yaml=yaml_task.outputs["parsed_yaml"], + parsed_python=python_task.outputs["parsed_python"] + ) + + store_task = store_code_index( + embedded_code=embed_task.outputs["embedded_code"], + milvus_host=milvus_host, + milvus_port=milvus_port, + collection_name=collection_name + ) + + +if __name__ == "__main__": + kfp.compiler.Compiler().compile( + pipeline_func=code_ingestion_pipeline, + package_path="code_ingestion_pipeline.yaml" + ) + print("Pipeline compiled to code_ingestion_pipeline.yaml") \ No newline at end of file diff --git a/pipelines/kubeflow-pipeline.py b/pipelines/kubeflow-pipeline.py index 5d7d618..32a2475 100644 --- a/pipelines/kubeflow-pipeline.py +++ b/pipelines/kubeflow-pipeline.py @@ -285,20 +285,43 @@ def chunk_and_embed( file_unique_id = f"{repo_name}:{file_data['path']}" # Create splitter - text_splitter = RecursiveCharacterTextSplitter( + # Split by markdown headers first — preserves document structure + from langchain.text_splitter import MarkdownHeaderTextSplitter + + headers_to_split_on = [ + ("#", "h1"), + ("##", "h2"), + ("###", "h3"), + ] + md_splitter = MarkdownHeaderTextSplitter( + headers_to_split_on=headers_to_split_on + ) + header_splits = md_splitter.split_text(content) + + # Then split each section into smaller chunks if needed + char_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, separators=["\n\n", "\n", ". ", " ", ""] ) - # Split into chunks - chunks = text_splitter.split_text(content) + chunks_with_meta = [] + for split in header_splits: + sub_chunks = char_splitter.split_text(split.page_content) + for sub in sub_chunks: + chunks_with_meta.append({ + "text": sub, + "h1": split.metadata.get("h1", ""), + "h2": split.metadata.get("h2", ""), + "h3": split.metadata.get("h3", ""), + }) - print(f"File: {file_data['path']} -> {len(chunks)} chunks (avg: {sum(len(c) for c in chunks)/len(chunks):.0f} chars)") + print(f"File: {file_data['path']} -> {len(chunks_with_meta)} chunks") # Create embeddings - for chunk_idx, chunk in enumerate(chunks): + for chunk_idx, chunk_meta in enumerate(chunks_with_meta): + chunk = chunk_meta["text"] embedding = model.encode(chunk).tolist() records.append({ 'file_unique_id': file_unique_id, @@ -308,8 +331,12 @@ def chunk_and_embed( 'citation_url': citation_url[:1024], 'chunk_index': chunk_idx, 'content_text': chunk[:2000], + 'h1': chunk_meta["h1"], + 'h2': chunk_meta["h2"], + 'h3': chunk_meta["h3"], 'embedding': embedding }) + print(f"Created {len(records)} total chunks") @@ -349,6 +376,9 @@ def store_milvus( FieldSchema(name="citation_url", dtype=DataType.VARCHAR, max_length=1024), FieldSchema(name="chunk_index", dtype=DataType.INT64), FieldSchema(name="content_text", dtype=DataType.VARCHAR, max_length=2000), + FieldSchema(name="h1", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="h2", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="h3", dtype=DataType.VARCHAR, max_length=300), FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), # Updated for all-mpnet-base-v2 FieldSchema(name="last_updated", dtype=DataType.INT64) ] @@ -373,9 +403,13 @@ def store_milvus( "citation_url": record["citation_url"], "chunk_index": record["chunk_index"], "content_text": record["content_text"], + "h1": record.get("h1", ""), # ← ADD + "h2": record.get("h2", ""), # ← ADD + "h3": record.get("h3", ""), # ← ADD "vector": record["embedding"], "last_updated": timestamp }) + if records: batch_size = 1000 @@ -410,7 +444,8 @@ def github_rag_pipeline( chunk_overlap: int = 100, milvus_host: str = "milvus-standalone-final.docs-agent.svc.cluster.local", milvus_port: str = "19530", - collection_name: str = "docs_rag" + collection_name: str = "docs_index" + ): # Download GitHub directory download_task = download_github_directory( diff --git a/routing_logs.jsonl b/routing_logs.jsonl new file mode 100644 index 0000000..c287c56 --- /dev/null +++ b/routing_logs.jsonl @@ -0,0 +1,6 @@ +{"timestamp": "2026-03-28T06:27:42.206133", "question": "What is Kubeflow Pipelines and how does it work?", "route": "docs", "doc_score": 2, "code_score": 0} +{"timestamp": "2026-03-28T06:27:42.915005", "question": "How do I fix a CrashLoopBackOff error in my KServe deployment?", "route": "code", "doc_score": 0, "code_score": 4} +{"timestamp": "2026-03-28T06:30:14.385607", "question": "What is Kubeflow Pipelines and how does it work?", "route": "docs", "doc_score": 2, "code_score": 0} +{"timestamp": "2026-03-28T06:30:14.519123", "question": "How do I fix a CrashLoopBackOff error in my KServe deployment?", "route": "code", "doc_score": 0, "code_score": 4} +{"timestamp": "2026-03-28T06:30:14.606369", "question": "Show me the webhook configuration YAML for notebooks", "route": "code", "doc_score": 0, "code_score": 3} +{"timestamp": "2026-03-28T16:40:35.342056", "question": "How do I install Kubeflow?", "route": "both", "doc_score": 0, "code_score": 0} diff --git a/seed_code_index.py b/seed_code_index.py new file mode 100644 index 0000000..1947f6b --- /dev/null +++ b/seed_code_index.py @@ -0,0 +1,147 @@ +from pymilvus import connections, utility, Collection +from pymilvus import FieldSchema, CollectionSchema, DataType +from sentence_transformers import SentenceTransformer + +print("Connecting to Milvus...") +connections.connect("default", host="localhost", port="19530") + +print("Loading embedding model...") +model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") + +# Drop if exists +if utility.has_collection("code_index"): + utility.drop_collection("code_index") + print("Dropped existing code_index") + +# Create code_index schema +fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="content_text", dtype=DataType.VARCHAR, max_length=4000), + FieldSchema(name="source_url", dtype=DataType.VARCHAR, max_length=500), + FieldSchema(name="kind", dtype=DataType.VARCHAR, max_length=100), + FieldSchema(name="name", dtype=DataType.VARCHAR, max_length=200), + FieldSchema(name="content_type", dtype=DataType.VARCHAR, max_length=50), + FieldSchema(name="h1", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="h2", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), +] +schema = CollectionSchema(fields, "Kubeflow manifests code index") +collection = Collection("code_index", schema) +print("Created code_index collection") + +# Sample Kubernetes resources +sample_code = [ + { + "text": """apiVersion: admissionregistration.k8s.io/v1 +kind: MutatingWebhookConfiguration +metadata: + name: inferenceservice.serving.kserve.io +webhooks: + - name: inferenceservice.kserve-webhook-server.pod-mutator + admissionReviewVersions: [v1beta1] + clientConfig: + service: + name: kserve-webhook-server-service + namespace: kserve + path: /mutate-pods""", + "url": "https://github.com/kubeflow/manifests/blob/master/apps/kserve/webhook.yaml", + "kind": "MutatingWebhookConfiguration", + "name": "inferenceservice.serving.kserve.io", + "type": "kubernetes_resource" + }, + { + "text": """apiVersion: apps/v1 +kind: Deployment +metadata: + name: kserve-controller-manager + namespace: kserve +spec: + replicas: 1 + selector: + matchLabels: + control-plane: kserve-controller-manager""", + "url": "https://github.com/kubeflow/manifests/blob/master/apps/kserve/deployment.yaml", + "kind": "Deployment", + "name": "kserve-controller-manager", + "type": "kubernetes_resource" + }, + { + "text": """def create_inference_service(name: str, model_uri: str, namespace: str = 'default'): + \"\"\"Creates a KServe InferenceService for model serving.\"\"\" + from kubernetes import client + isvc = { + 'apiVersion': 'serving.kserve.io/v1beta1', + 'kind': 'InferenceService', + 'metadata': {'name': name, 'namespace': namespace}, + 'spec': {'predictor': {'model': {'modelFormat': {'name': 'sklearn'}, 'storageUri': model_uri}}} + } + return isvc""", + "url": "https://github.com/kubeflow/manifests/blob/master/apps/kserve/utils.py#L10", + "kind": "FunctionDef", + "name": "create_inference_service", + "type": "python_definition" + }, + { + "text": """apiVersion: v1 +kind: ConfigMap +metadata: + name: inferenceservice-config + namespace: kserve +data: + agent: | + { + "image": "kserve/agent:latest", + "memoryRequest": "100Mi", + "memoryLimit": "1Gi" + }""", + "url": "https://github.com/kubeflow/manifests/blob/master/apps/kserve/configmap.yaml", + "kind": "ConfigMap", + "name": "inferenceservice-config", + "type": "kubernetes_resource" + }, + { + "text": """apiVersion: v1 +kind: Service +metadata: + name: kserve-webhook-server-service + namespace: kserve +spec: + ports: + - port: 443 + targetPort: 9443 + selector: + control-plane: kserve-controller-manager""", + "url": "https://github.com/kubeflow/manifests/blob/master/apps/kserve/service.yaml", + "kind": "Service", + "name": "kserve-webhook-server-service", + "type": "kubernetes_resource" + }, +] + +print(f"Embedding {len(sample_code)} code units...") +texts = [d["text"] for d in sample_code] +embeddings = model.encode(texts).tolist() + +collection.insert([{ + "content_text": d["text"], + "source_url": d["url"], + "kind": d["kind"], + "name": d["name"], + "content_type": d["type"], + "h1": d["kind"], + "h2": d["name"], + "vector": e +} for d, e in zip(sample_code, embeddings)]) + +collection.flush() + +index_params = { + "metric_type": "COSINE", + "index_type": "IVF_FLAT", + "params": {"nlist": 128} +} +collection.create_index("vector", index_params) +collection.load() + +print(f"Seeded code_index with {collection.num_entities} records") +print("Done - code_index ready") \ No newline at end of file diff --git a/test_agent.py b/test_agent.py new file mode 100644 index 0000000..57176b1 --- /dev/null +++ b/test_agent.py @@ -0,0 +1,22 @@ +from agent.core.graph import agent + +questions = [ + "What is Kubeflow Pipelines and how does it work?", + "How do I fix a CrashLoopBackOff error in my KServe deployment?", + "Show me the webhook configuration YAML for notebooks", +] + +for q in questions: + print(f"\nQ: {q}") + result = agent.invoke({ + "question": q, + "route": "", + "chunks": [], + "citations": [], + "answer": "", + "messages": [] + }) + print(f"Route: {result['route']}") + print(f"Chunks: {len(result['chunks'])}") + print(f"Answer: {result['answer'][:200]}") + print("-" * 50) \ No newline at end of file diff --git a/test_code_ingestion.py b/test_code_ingestion.py new file mode 100644 index 0000000..fe562d4 --- /dev/null +++ b/test_code_ingestion.py @@ -0,0 +1,115 @@ +import yaml +import ast +import json +from pathlib import Path + +# ── TEST 1: YAML parser ─────────────────────────────────── +print("=" * 50) +print("TEST 1 - YAML Resource Parser") +print("=" * 50) + +sample_yaml = """ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kubeflow-pipelines + namespace: kubeflow +spec: + replicas: 1 + selector: + matchLabels: + app: kubeflow-pipelines +--- +apiVersion: v1 +kind: Service +metadata: + name: kubeflow-pipelines-service + namespace: kubeflow +spec: + ports: + - port: 8888 +""" + +units = [] +docs = list(yaml.safe_load_all(sample_yaml)) + +for doc in docs: + if not doc or "kind" not in doc: + continue + units.append({ + "text": yaml.dump(doc, default_flow_style=False), + "kind": doc.get("kind", ""), + "name": doc.get("metadata", {}).get("name", ""), + "content_type": "kubernetes_resource", + "source_url": "https://github.com/kubeflow/manifests/blob/master/example.yaml" + }) + +print(f"Parsed {len(units)} Kubernetes resources") +for u in units: + print(f" kind={u['kind']} name={u['name']}") + +assert len(units) == 2, "Expected 2 resources" +assert units[0]["kind"] == "Deployment" +assert units[1]["kind"] == "Service" +print("YAML TEST PASSED") + + +# ── TEST 2: Python AST parser ───────────────────────────── +print() +print("=" * 50) +print("TEST 2 - Python AST Parser") +print("=" * 50) + +sample_python = ''' +def create_pipeline(name: str, description: str = ""): + """Creates a new KFP pipeline with the given name.""" + pipeline = Pipeline(name=name) + pipeline.description = description + return pipeline + + +class PipelineRunner: + """Handles execution of Kubeflow Pipelines.""" + + def run(self, pipeline_id: str): + """Run a pipeline by ID.""" + return self.client.run_pipeline(pipeline_id) + + def _validate(self, pipeline_id: str): + if not pipeline_id: + raise ValueError("pipeline_id cannot be empty") +''' + +tree = ast.parse(sample_python) +units = [] + +for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + continue + snippet = ast.get_source_segment(sample_python, node) or "" + if len(snippet) < 20: + continue + docstring = ast.get_docstring(node) or "" + units.append({ + "text": snippet, + "kind": type(node).__name__, + "name": node.name, + "docstring": docstring, + "content_type": "python_definition", + "source_url": f"https://github.com/kubeflow/manifests/blob/master/example.py#L{node.lineno}" + }) + +print(f"Parsed {len(units)} Python definitions") +for u in units: + print(f" kind={u['kind']} name={u['name']}") + print(f" docstring={u['docstring'][:50]}") + +assert len(units) >= 3, "Expected at least 3 definitions" +print("PYTHON AST TEST PASSED") + + +# ── SUMMARY ─────────────────────────────────────────────── +print() +print("=" * 50) +print("ALL TESTS PASSED - Code ingestion pipeline ready") +print("=" * 50) \ No newline at end of file diff --git a/test_mcp_tools.py b/test_mcp_tools.py new file mode 100644 index 0000000..1778dc1 --- /dev/null +++ b/test_mcp_tools.py @@ -0,0 +1,35 @@ +from agent.tools.mcp_tools import query_docs_tool, query_code_tool, query_both_tool +import json + +print("=" * 50) +print("TEST 1 - query_docs_tool") +print("=" * 50) +result = query_docs_tool("What is Kubeflow Pipelines?") +print(f"Tool: {result['tool']}") +print(f"Count: {result['count']}") +for r in result["results"]: + print(f"\n snippet : {r['golden_snippet'][:80]}") + print(f" url : {r['validation_link']}") + print(f" score : {r['relevance_score']}") + +print("\n" + "=" * 50) +print("TEST 2 - query_code_tool") +print("=" * 50) +result = query_code_tool("webhook configuration YAML") +print(f"Tool: {result['tool']}") +print(f"Count: {result['count']}") +for r in result["results"]: + print(f"\n snippet : {r['golden_snippet'][:80]}") + print(f" url : {r['validation_link']}") + print(f" kind : {r['kind']}") + +print("\n" + "=" * 50) +print("TEST 3 - query_both_tool") +print("=" * 50) +result = query_both_tool("how to install and configure KServe") +print(f"Tool : {result['tool']}") +print(f"Docs count : {len(result['docs_results'])}") +print(f"Code count : {len(result['code_results'])}") +print(f"Total : {result['total_count']}") + +print("\nALL MCP TOOL TESTS PASSED") \ No newline at end of file diff --git a/test_milvus_full.py b/test_milvus_full.py new file mode 100644 index 0000000..759045d --- /dev/null +++ b/test_milvus_full.py @@ -0,0 +1,100 @@ +from pymilvus import connections, utility, Collection +from pymilvus import FieldSchema, CollectionSchema, DataType +from sentence_transformers import SentenceTransformer + +print("Step 1 - Connecting to Milvus...") +connections.connect("default", host="localhost", port="19530") +print("Connected OK") + +print("\nStep 2 - Loading embedding model...") +model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2") +print("Model loaded") + +print("\nStep 3 - Creating docs_index collection...") +if utility.has_collection("docs_index"): + utility.drop_collection("docs_index") + +fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="content_text", dtype=DataType.VARCHAR, max_length=2000), + FieldSchema(name="source_url", dtype=DataType.VARCHAR, max_length=500), + FieldSchema(name="h1", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="h2", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="h3", dtype=DataType.VARCHAR, max_length=300), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), +] +schema = CollectionSchema(fields, "Kubeflow docs index") +collection = Collection("docs_index", schema) +print("Collection created") + +print("\nStep 4 - Inserting sample docs...") +sample_docs = [ + { + "text": "Kubeflow Pipelines is a platform for building and deploying ML workflows.", + "url": "https://kubeflow.org/docs/pipelines", + "h1": "Kubeflow Pipelines", + "h2": "Overview", + "h3": "" + }, + { + "text": "KServe provides serverless inferencing on Kubernetes using InferenceService CRD.", + "url": "https://kubeflow.org/docs/kserve", + "h1": "KServe", + "h2": "Installation", + "h3": "" + }, + { + "text": "To install Kubeflow you need a Kubernetes cluster version 1.20 or higher.", + "url": "https://kubeflow.org/docs/started", + "h1": "Getting Started", + "h2": "Prerequisites", + "h3": "Kubernetes version" + }, +] + +texts = [d["text"] for d in sample_docs] +embeddings = model.encode(texts).tolist() + +collection.insert([{ + "content_text": d["text"], + "source_url": d["url"], + "h1": d["h1"], + "h2": d["h2"], + "h3": d["h3"], + "vector": e +} for d, e in zip(sample_docs, embeddings)]) + +collection.flush() + +index_params = { + "metric_type": "COSINE", + "index_type": "IVF_FLAT", + "params": {"nlist": 128} +} +collection.create_index("vector", index_params) +collection.load() +print(f"Inserted {collection.num_entities} documents") + +print("\nStep 5 - Searching...") +query = "how to install kubeflow" +query_vec = model.encode(query).tolist() + +results = collection.search( + data=[query_vec], + anns_field="vector", + param={"metric_type": "COSINE", "params": {"nprobe": 10}}, + limit=3, + output_fields=["content_text", "source_url", "h1", "h2"] +) + +print(f"Query: '{query}'") +print("Results:") +for i, hit in enumerate(results[0]): + print(f"\n Result {i+1}:") + print(f" score : {hit.score:.4f}") + print(f" h1 : {hit.entity.get('h1')}") + print(f" h2 : {hit.entity.get('h2')}") + print(f" text : {hit.entity.get('content_text')[:80]}") + print(f" url : {hit.entity.get('source_url')}") + +print("\nPhase 1 end to end test PASSED") \ No newline at end of file diff --git a/verify_changes.py b/verify_changes.py new file mode 100644 index 0000000..231a2bf --- /dev/null +++ b/verify_changes.py @@ -0,0 +1,16 @@ +content = open('pipelines/kubeflow-pipeline.py', encoding='utf-8').read() + +if 'h1": record.get("h1' in content: + print('A3 OK - h1/h2/h3 fields in records.append') +else: + print('A3 MISSING - h1/h2/h3 not found in records.append') + +if 'docs_index' in content: + print('A4 OK - collection renamed to docs_index') +else: + print('A4 MISSING - still says docs_rag') + +if 'MarkdownHeaderTextSplitter' in content: + print('A1 OK - MarkdownHeaderTextSplitter present') +else: + print('A1 MISSING - chunker not upgraded') \ No newline at end of file