diff --git a/.env b/.env index 16b7720e75..2f62a187bb 100644 --- a/.env +++ b/.env @@ -194,6 +194,24 @@ PROMETHEUS_PORT=9090 PROMETHEUS_HOST=prometheus PROMETHEUS_ADDR=${PROMETHEUS_HOST}:${PROMETHEUS_PORT} +# Agent +GRAPH_RECURSION_LIMIT=25 +MCP_ENDPOINT=mcp +MCP_PORT=8011 +MCP_ENABLED=False +AGENT_ENDPOINT=agent +AGENT_PORT=8010 +CHATBOT_PORT=7860 +LLM_BASE_URL= +LLM_MODEL= +USE_VCR=True +LLM_TLS_VERIFY=True +AGENT_DOCKERFILE=src/agent/Dockerfile +MCP_DOCKERFILE=src/mcp/Dockerfile +CHATBOT_DOCKERFILE=src/chatbot/Dockerfile +AGENT_ADDR=${AGENT_ENDPOINT}:${AGENT_PORT} +APPLICATION_ENDPOINT=frontend:8080 +OTEL_EXPORTER_OTLP_INSECURE=true # Telemetry Documentation TELEMETRY_DOCS_HOST=telemetry-docs TELEMETRY_DOCS_PORT=8000 diff --git a/.gitignore b/.gitignore index ecd031845b..a7198f68cd 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,8 @@ src/shipping/target/ test/tracetesting/tracetesting-vars.yaml !src/currency/build +src/cart/src/Program.cs.bak +src/flagd/demo.flagd.json.bak + +src/agent/fixtures/* diff --git a/docker-compose.yml b/docker-compose.yml index a1bd187509..d5867a4c18 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -606,7 +606,7 @@ services: deploy: resources: limits: - memory: 500M # This is high to enable supporting the recommendationCache feature flag use case + memory: 500M # This is high to enable supporting the recommendationCache feature flag use case restart: unless-stopped ports: - "${RECOMMENDATION_PORT}" @@ -677,18 +677,13 @@ services: - GOMEMLIMIT=60MiB - OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES},service.criticality=low - OTEL_SERVICE_NAME=flagd - command: [ - "start", - "--uri", - "file:./etc/flagd/demo.flagd.json" - ] + command: ["start", "--uri", "file:./etc/flagd/demo.flagd.json"] ports: - "${FLAGD_PORT}" - "${FLAGD_OFREP_PORT}" volumes: - ./src/flagd:/etc/flagd - logging: - *logging + logging: *logging # Flagd UI for configuring the feature flag service flagd-ui: @@ -780,6 +775,105 @@ services: condition: service_started logging: *logging + # Agent + agent: + image: ${IMAGE_NAME}:${DEMO_VERSION}-agent + container_name: agent + build: + context: ./ + dockerfile: ${AGENT_DOCKERFILE} + cache_from: + - ${IMAGE_NAME}:${IMAGE_VERSION}-agent + deploy: + resources: + limits: + memory: 500M + restart: unless-stopped + ports: + - "${AGENT_PORT}" + environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_GRPC} + - GRAPH_RECURSION_LIMIT + - MCP_ENDPOINT=mcp + - MCP_PORT=8011 + - MCP_ENABLED=True + - AGENT_ENDPOINT + - AGENT_PORT + - LLM_BASE_URL + - LLM_MODEL + - USE_VCR=True + - OPENAI_API_KEY + - APPLICATION_ENDPOINT=frontend:8080 + - OTEL_EXPORTER_OTLP_INSECURE=true + - OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES},service.criticality=low + - OTEL_SERVICE_NAME=agent + depends_on: + jaeger: + condition: service_started + otel-collector: + condition: service_started + product-catalog: + condition: service_started + logging: *logging + + # MCP + mcp: + image: ${IMAGE_NAME}:${DEMO_VERSION}-mcp + container_name: mcp + build: + context: ./ + dockerfile: ${MCP_DOCKERFILE} + cache_from: + - ${IMAGE_NAME}:${IMAGE_VERSION}-mcp + deploy: + resources: + limits: + memory: 500M + restart: unless-stopped + ports: + - "${MCP_PORT}" + environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_COLLECTOR_HOST}:${OTEL_COLLECTOR_PORT_GRPC} + - MCP_ENDPOINT + - MCP_PORT + - OTEL_EXPORTER_OTLP_INSECURE=true + - APPLICATION_ENDPOINT=frontend:8080 + - OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES},service.criticality=low + - OTEL_SERVICE_NAME=mcp + depends_on: + agent: + condition: service_started + logging: *logging + + # ChatBot + chatbot: + image: ${IMAGE_NAME}:${DEMO_VERSION}-chatbot + container_name: chatbot + build: + context: ./ + dockerfile: ${CHATBOT_DOCKERFILE} + cache_from: + - ${IMAGE_NAME}:${IMAGE_VERSION}-chatbot + deploy: + resources: + limits: + memory: 500M + restart: unless-stopped + ports: + - "${CHATBOT_PORT}" + environment: + - AGENT_PORT + - AGENT_ENDPOINT + - CHATBOT_PORT + - OTEL_RESOURCE_ATTRIBUTES=${OTEL_RESOURCE_ATTRIBUTES},service.criticality=low + - OTEL_SERVICE_NAME=chatbot + depends_on: + agent: + condition: service_started + logging: *logging + + # Postgresql used by Accounting service + postgresql: # PostgreSQL database for the astronomy demo astronomy-db: image: ${POSTGRES_IMAGE} @@ -812,7 +906,6 @@ services: - "${VALKEY_PORT}" logging: *logging - # ******************** # Telemetry Components # ******************** @@ -868,7 +961,11 @@ services: limits: memory: 200M restart: unless-stopped - command: [ "--config=/etc/otelcol-config.yml", "--config=/etc/otelcol-config-extras.yml" ] + command: + [ + "--config=/etc/otelcol-config.yml", + "--config=/etc/otelcol-config-extras.yml", + ] user: 0:0 volumes: - ${HOST_FILESYSTEM}:/hostfs:ro diff --git a/src/agent/Dockerfile b/src/agent/Dockerfile new file mode 100644 index 0000000000..c8fae4a938 --- /dev/null +++ b/src/agent/Dockerfile @@ -0,0 +1,23 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +COPY src/agent/requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/agent/src src +COPY src/agent/run.py . + +EXPOSE ${AGENT_PORT} + +CMD ["python", "run.py"] diff --git a/src/agent/requirements.txt b/src/agent/requirements.txt new file mode 100644 index 0000000000..38fd3f9804 --- /dev/null +++ b/src/agent/requirements.txt @@ -0,0 +1,18 @@ +requests==2.32.5 +traceloop-sdk==0.47.5 +dotenv==0.9.9 +httpx==0.28.1 +langchain_openai==1.0.1 +langchain==1.0.5 +langgraph-prebuilt==1.0.5 +mcp==1.22.0 +fastmcp>=2.13.1 +langchain_mcp_adapters==0.1.14 +langchain_community==0.4.1 +fastapi==0.120.4 +opentelemetry-api==1.38.0 +opentelemetry-sdk==1.38.0 +opentelemetry-semantic-conventions==0.59b0 +opentelemetry-instrumentation-langchain<0.53.0 +gradio==6.0.1 +vcrpy==7.0.0 diff --git a/src/agent/run.py b/src/agent/run.py new file mode 100644 index 0000000000..ed5a55900c --- /dev/null +++ b/src/agent/run.py @@ -0,0 +1,37 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + + +import asyncio +import logging +import os + +from dotenv import load_dotenv +from src.agents.agents import Agent +from traceloop.sdk import Traceloop + +logging.basicConfig(level=logging.INFO) + +load_dotenv() + +Traceloop.init( + app_name="AstronomyShopAgent", + api_endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), +) + + +async def start_servers(): + """Run the LangGraph Agent server""" + tasks = [] + agent = Agent() + tasks.append(agent.launch()) + await asyncio.gather(*tasks) + + +if __name__ == "__main__": + try: + asyncio.run(start_servers()) + except KeyboardInterrupt: + logging.info("Shutting down servers...") diff --git a/src/agent/src/agents/agents.py b/src/agent/src/agents/agents.py new file mode 100644 index 0000000000..970541e5d2 --- /dev/null +++ b/src/agent/src/agents/agents.py @@ -0,0 +1,108 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +from contextlib import asynccontextmanager +from typing import Dict, List + +import uvicorn +from fastapi import FastAPI, HTTPException +from langchain.agents import create_agent +from langchain.tools import tool +from langchain_mcp_adapters.tools import load_mcp_tools +from pydantic import BaseModel +from src.agents.llm import ChatLLM +from src.agents.mcp_client import MCPClient +from src.agents.tools import ( + add_to_cart, + checkout, + empty_cart, + get_ads, + get_cart, + get_product, + get_recommendations, + get_shipping_quote, + get_supported_currencies, + list_products, +) +from traceloop.sdk.decorators import workflow + + +class ChatRequest(BaseModel): + message: str + history: List[Dict] = [] + + +class Agent: + def __init__(self): + self.app = FastAPI(lifespan=self.lifespan) + self.app.post("/prompt")(self.handle_prompt) + self.agentRecursionLimit = int(os.getenv("GRAPH_RECURSION_LIMIT", "25")) + self.mcp_server_url = f"http://{os.getenv('MCP_ENDPOINT', '0.0.0.0')}:{os.getenv('MCP_PORT', '8011')}/mcp" + + self.mcp_server = None + + @asynccontextmanager + async def lifespan(self, app: FastAPI): + mcp_enabled = os.getenv("MCP_ENABLED", "False") == "True" + if mcp_enabled: + logging.info("MCP tools enabled") + self.mcp_server = MCPClient() + await self.mcp_server.connect_to_mcp_server(self.mcp_server_url) + yield + if self.mcp_server: + await self.mcp_server.cleanup() + + async def handle_prompt(self, request: ChatRequest): + return await self.run_agent(request.message) + + async def get_tool_list(self): + mcp_enabled = os.getenv("MCP_ENABLED", "False") == "True" + if mcp_enabled and self.mcp_server is not None: + return await load_mcp_tools(self.mcp_server.session) + else: + tool_list = [ + add_to_cart, + checkout, + convert_currency, + empty_cart, + get_ads, + get_cart, + get_product, + get_recommendations, + get_shipping_quote, + get_supported_currencies, + list_products, + ] + return [tool(t) for t in tool_list] + + @workflow(name="astronomy_shop_agent_workflow") + async def run_agent(self, input_prompt): + model = ChatLLM() + tools = await self.get_tool_list() + agent = create_agent( + model, + tools=tools, + system_prompt="You are a helpful assistant. Be concise and accurate.", + ) + try: + result = await agent.ainvoke( + {"messages": [{"role": "user", "content": input_prompt}]} + ) + return {"response": result} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + async def launch(self): + agent_port = int(os.getenv("AGENT_PORT", "8010")) + agent_config = uvicorn.Config( + app=self.app, + host="0.0.0.0", + port=agent_port, + log_level="info", + ) + agent_server = uvicorn.Server(agent_config) + await agent_server.serve() diff --git a/src/agent/src/agents/llm.py b/src/agent/src/agents/llm.py new file mode 100644 index 0000000000..6f8f21e460 --- /dev/null +++ b/src/agent/src/agents/llm.py @@ -0,0 +1,40 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import os + +import httpx +from langchain_openai import ChatOpenAI +from src.agents.patch_vcr import VCR + + +class ChatLLM(ChatOpenAI): + def __init__(self, **kwargs): + model_name = os.getenv("LLM_MODEL", "default") + use_vcr = os.getenv("USE_VCR", False) == "True" + llm_tls_verify = os.getenv("LLM_TLS_VERIFY", True) == "True" + cassette_name = "" + if use_vcr: + cassette_name = f"{model_name.replace('/', '_')}_casette.yaml" + if "http_async_client" not in kwargs: + kwargs["http_async_client"] = httpx.AsyncClient(verify=llm_tls_verify) + kwargs.setdefault("openai_api_base", os.getenv("LLM_BASE_URL")) + kwargs.setdefault("model", model_name) + super().__init__(**kwargs) + + object.__setattr__(self, "_use_vcr", use_vcr) + object.__setattr__(self, "_cassette_name", cassette_name) + + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + if getattr(self, "_use_vcr", False): + with VCR.use_cassette(self._cassette_name): + return super()._generate(messages, stop, run_manager, **kwargs) + return super()._generate(messages, stop, run_manager, **kwargs) + + async def _agenerate(self, messages, stop=None, run_manager=None, **kwargs): + if getattr(self, "_use_vcr", False): + with VCR.use_cassette(self._cassette_name): + return await super()._agenerate(messages, stop, run_manager, **kwargs) + return await super()._agenerate(messages, stop, run_manager, **kwargs) diff --git a/src/agent/src/agents/mcp_client.py b/src/agent/src/agents/mcp_client.py new file mode 100644 index 0000000000..0767729d50 --- /dev/null +++ b/src/agent/src/agents/mcp_client.py @@ -0,0 +1,28 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from contextlib import AsyncExitStack + +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client + + +class MCPClient: + def __init__(self): + self.exit_stack = AsyncExitStack() + self.session = None + + async def connect_to_mcp_server(self, url): + stream_context = streamablehttp_client(url=url) + read, write, _ = await self.exit_stack.enter_async_context(stream_context) + session_context = ClientSession(read, write) + self.session = await self.exit_stack.enter_async_context(session_context) + await self.session.initialize() + + async def cleanup(self): + try: + await self.exit_stack.aclose() + except Exception as e: + print(f"Error closing connection : {e}") diff --git a/src/agent/src/agents/patch_vcr.py b/src/agent/src/agents/patch_vcr.py new file mode 100644 index 0000000000..bf45795d19 --- /dev/null +++ b/src/agent/src/agents/patch_vcr.py @@ -0,0 +1,65 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging + +import vcr +import vcr.stubs.httpx_stubs + +logging.getLogger("vcr").setLevel(logging.ERROR) + + +async def patched_to_serialized_response(response, aread=False): + if hasattr(response, "_decoder"): + del response._decoder + + if aread: + content = await response.aread() + else: + content = response.read() + + return { + "status": {"code": response.status_code, "message": response.reason_phrase}, + "headers": dict(response.headers), + "body": {"string": content}, + } + + +vcr.stubs.httpx_stubs._to_serialized_response = patched_to_serialized_response + + +def normalize_body(request): + try: + if request.body: + encoding = "utf-8" + if hasattr(request.body, "decode"): + data = json.loads(request.body.decode(encoding)) + else: + data = json.loads(request.body) + request.body = json.dumps(data, sort_keys=True).encode(encoding) + except Exception: + pass + return request + + +def clean_response(response): + headers = response["headers"] + headers.pop("content-encoding", None) + headers.pop("transfer-encoding", None) + return response + + +VCR = vcr.VCR( + cassette_library_dir="fixtures/vcr_cassettes", + record_mode="new_episodes", + serializer="yaml", + path_transformer=vcr.VCR.ensure_suffix(".yaml"), + filter_headers=["authorization", "x-api-key", "api-key"], + match_on=["method", "uri", "body"], + before_record_request=normalize_body, + before_record_response=clean_response, + decode_compressed_response=True, +) diff --git a/src/agent/src/agents/tools.py b/src/agent/src/agents/tools.py new file mode 100644 index 0000000000..d28fa97118 --- /dev/null +++ b/src/agent/src/agents/tools.py @@ -0,0 +1,140 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import json +import os + +import requests + +BASE_URL = os.getenv("APPLICATION_ENDPOINT", "localhost:8080") + + +def get_ads(category: str): + """Fetch promotional ads for Astronomy Shop homepage. + Eg : category: `telescopes` or `travel`""" + url = f"http://{BASE_URL}/api/data" + params = {"contextKeys": category} + try: + res = requests.get(url, params=params) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error fetching ads: {e}" + + +def add_to_cart(user_id: str, product_id: str, quantity: int = 1): + """Add a product (product_id) to the shopping cart for a user (user_id).""" + url = f"http://{BASE_URL}/api/cart" + data = { + "item": { + "productId": product_id, + "quantity": quantity, + }, + "userId": user_id, + } + try: + res = requests.post(url, json=data) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while adding product to cart: {e}" + + +def get_cart(user_id: str): + """Retrieve the current contents of a user's cart.""" + url = f"http://{BASE_URL}/api/cart" + try: + res = requests.get(url, params={"user_id": user_id}) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while fetching cart: {e}" + + +def empty_cart(user_id: str): + """Empty the shopping cart for a user.""" + url = f"http://{BASE_URL}/api/cart/empty/{user_id}" + try: + res = requests.get(url) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while emptying cart: {e}" + + +def list_products(): + """List all products available in the Astronomy Shop.""" + url = f"http://{BASE_URL}/api/products" + try: + res = requests.get(url) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while fetching product list: {e}" + + +def get_product(product_id: str): + """Get detailed information about a product using its ID.""" + url = f"http://{BASE_URL}/api/products/{product_id}" + try: + res = requests.get(url) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while fetching product {product_id}: {e}" + + +def checkout(checkout_person): + """Checkout the user's cart and create an order. + Takes request in the format {string user_id, string userCurrency, Address address, string email, CreditCardInfo creditCard} + Where Address is {string streetAddress, string city, string state, string country, string zipCode} and + CreditCardInfo is {string creditCardNumber, int32 creditCardCvv, int32 creditCardExpirationYear, int32 creditCardExpirationMonth} + """ + url = f"http://{BASE_URL}/api/checkout" + try: + res = requests.post(url, json=checkout_person) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while performing checkout: {e}" + + +def get_supported_currencies(): + """List supported currencies in Astronomy Shop.""" + url = f"http://{BASE_URL}/api/currency" + try: + res = requests.get(url) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error while fetching currency list: {e}" + + +def get_recommendations(product_id: str): + """Get product recommendations for a user.""" + url = f"http://{BASE_URL}/api/recommendations" + params = {"productIds": product_id} + try: + res = requests.get(url, params=params) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error fetching recommendations: {e}" + + +def get_shipping_quote(items, currency_code, address): + """Get estimated shipping cost for a given address. + Items is {string product_id, int32 quantity} + CurrencyCode is currency identifier Eg: `USD` + Address is {string streetAddress, string city, string state, string country, string zipCode} + """ + url = f"http://{BASE_URL}/api/shipping" + params = {"itemList": json.dumps(items), "currencyCode": currency_code, "address": json.dumps(address)} + try: + res = requests.get(url, params) + res.raise_for_status() + return res.json() + except Exception as e: + return f"Error fetching shipping quote: {e}" diff --git a/src/chatbot/Dockerfile b/src/chatbot/Dockerfile new file mode 100644 index 0000000000..4e071d19ee --- /dev/null +++ b/src/chatbot/Dockerfile @@ -0,0 +1,23 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +COPY src/chatbot/requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/chatbot/src src +COPY src/chatbot/run.py . + +EXPOSE ${CHATBOT_PORT} + +CMD ["python", "run.py"] diff --git a/src/chatbot/requirements.txt b/src/chatbot/requirements.txt new file mode 100644 index 0000000000..284361f02b --- /dev/null +++ b/src/chatbot/requirements.txt @@ -0,0 +1,5 @@ +requests==2.32.5 +dotenv==0.9.9 +httpx==0.28.1 +fastapi==0.120.4 +gradio==6.0.1 diff --git a/src/chatbot/run.py b/src/chatbot/run.py new file mode 100644 index 0000000000..18adbb70b3 --- /dev/null +++ b/src/chatbot/run.py @@ -0,0 +1,33 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + + +import asyncio +import logging + +from dotenv import load_dotenv +from src.chat_interface.chat_interface import ChatAgentUI, get_chat_ui_config + +logging.basicConfig(level=logging.INFO) + +load_dotenv() + + +async def start_servers(): + """Runs chatbot server""" + tasks = [] + + chat_ui_config = get_chat_ui_config() + chat_interface = ChatAgentUI(chat_ui_config) + tasks.append(asyncio.to_thread(chat_interface.launch)) + + await asyncio.gather(*tasks) + + +if __name__ == "__main__": + try: + asyncio.run(start_servers()) + except KeyboardInterrupt: + logging.info("Shutting down servers...") diff --git a/src/chatbot/src/chat_interface/chat_interface.py b/src/chatbot/src/chat_interface/chat_interface.py new file mode 100644 index 0000000000..72536d3417 --- /dev/null +++ b/src/chatbot/src/chat_interface/chat_interface.py @@ -0,0 +1,92 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import logging +import os +import uuid + +import gradio as gr +import requests +from pydantic import BaseModel + + +class ChatUiConfig(BaseModel): + uiBaseUrl: str + uiPort: int + sessionId: str + timeout: int + agentBaseUrl: str + + +class ChatAgentUI: + def __init__(self, chat_ui_config: ChatUiConfig): + self.config = chat_ui_config + + def chat_with_agent(self, message, history): + try: + payload = { + "message": message, + "session_id": self.config.sessionId, + "history": history, + } + logging.info(f"Sending request {payload} to Agent") + response = requests.post( + self.config.agentBaseUrl, json=payload, timeout=self.config.timeout + ) + response.raise_for_status() + + agent_data = response.json().get("response", {}) + messages = agent_data.get("messages", []) + if messages and isinstance(messages, list): + return messages[-1].get( + "content", "Agent returned an empty message body." + ) + return "Error: Received an unexpected response format from the agent." + + except Exception as e: + logging.error(f"Error : {e}") + return f"Error: {e}" + + def launch(self, agent_config=None): + config = { + "title": "Astronomy Shop agent", + "description": "Ask me about the astronomy shop application, I will ask for more information if needed.", + "examples": [ + [ + "What are the categories of products available in Astronomy Shop Application and what are the products in each category" + ], + ["Get me all shipping codes in Astronomy Shop Application?"], + ["Get all the items in the cart for user anonymous-1"], + ], + } + + if agent_config: + config.update( + agent_config[0] if isinstance(agent_config, list) else agent_config + ) + + chatbot_ui = gr.ChatInterface( + fn=self.chat_with_agent, + title=config["title"], + description=config["description"], + examples=config["examples"], + chatbot=gr.Chatbot(height="70vh"), + ) + + chatbot_ui.launch( + server_name=self.config.uiBaseUrl, + server_port=self.config.uiPort, + ) + + +def get_chat_ui_config(): + chat_ui_config = ChatUiConfig( + uiBaseUrl=os.getenv("CHATBOT_ENDPOINT", "0.0.0.0"), + uiPort=int(os.getenv("CHATBOT_PORT", "7860")), + sessionId=str(uuid.uuid4()), + timeout=int(os.getenv("AGENT_CHAT_INTERFACE_TIMEOUT", "300")), + agentBaseUrl=f"http://{os.getenv('AGENT_ENDPOINT', '0.0.0.0')}:{os.getenv('AGENT_PORT', '8010')}/prompt", + ) + return chat_ui_config diff --git a/src/mcp/Dockerfile b/src/mcp/Dockerfile new file mode 100644 index 0000000000..b2e587580e --- /dev/null +++ b/src/mcp/Dockerfile @@ -0,0 +1,24 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 +WORKDIR /app + +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +COPY src/mcp/requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/mcp/src src +COPY src/agent/src/agents/tools.py src/mcp_server +COPY src/mcp/run.py . + +EXPOSE ${MCP_PORT} + +CMD ["python", "run.py"] diff --git a/src/mcp/requirements.txt b/src/mcp/requirements.txt new file mode 100644 index 0000000000..1ec12d926b --- /dev/null +++ b/src/mcp/requirements.txt @@ -0,0 +1,13 @@ +requests==2.32.5 +traceloop-sdk==0.47.5 +dotenv==0.9.9 +httpx==0.28.1 +langchain==1.0.3 +mcp==1.22.0 +fastmcp>=2.13.1 +fastapi==0.120.4 +opentelemetry-api==1.38.0 +opentelemetry-sdk==1.38.0 +opentelemetry-semantic-conventions==0.59b0 +opentelemetry-instrumentation-langchain<0.53.0 +fastmcp==2.13.1 diff --git a/src/mcp/run.py b/src/mcp/run.py new file mode 100644 index 0000000000..e4ce081467 --- /dev/null +++ b/src/mcp/run.py @@ -0,0 +1,40 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + + +import asyncio +import logging +import os + +from dotenv import load_dotenv +from src.mcp_server.astronomy_shop_mcp_server import AstronomyShopMcp +from traceloop.sdk import Traceloop + +logging.basicConfig(level=logging.INFO) + +load_dotenv() + +Traceloop.init( + app_name="AstronomyShopAgentMCP", + api_endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), +) + + +async def start_servers(): + """Runs the MCP server.""" + tasks = [] + mcp = AstronomyShopMcp() + mcp_server_task = asyncio.to_thread(mcp.run) + tasks.append(mcp_server_task) + logging.info("MCP Server should be up, launching Agent...") + + await asyncio.gather(*tasks) + + +if __name__ == "__main__": + try: + asyncio.run(start_servers()) + except KeyboardInterrupt: + logging.info("Shutting down servers...") diff --git a/src/mcp/src/mcp_server/astronomy_shop_mcp_server.py b/src/mcp/src/mcp_server/astronomy_shop_mcp_server.py new file mode 100644 index 0000000000..046f9a729c --- /dev/null +++ b/src/mcp/src/mcp_server/astronomy_shop_mcp_server.py @@ -0,0 +1,40 @@ +#!/usr/bin/python + +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 +# +import logging +import os + +from fastmcp import FastMCP +from src.mcp_server import tools + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class AstronomyShopMcp: + def __init__(self) -> None: + self.host = "0.0.0.0" + self.port = int(os.getenv("MCP_PORT", "8011")) + self.mcp = FastMCP("astronomy-shop-mcp") + + self._register_tools() + + def _register_tools(self): + """Programmatically register static methods as MCP tools.""" + self.mcp.tool("add_to_cart")(tools.add_to_cart) + self.mcp.tool("checkout")(tools.checkout) + self.mcp.tool("empty_cart")(tools.empty_cart) + self.mcp.tool("get_ads")(tools.get_ads) + self.mcp.tool("get_cart")(tools.get_cart) + self.mcp.tool("get_product")(tools.get_product) + self.mcp.tool("get_recommendations")(tools.get_recommendations) + self.mcp.tool("get_shipping_quote")(tools.get_shipping_quote) + self.mcp.tool("get_supported_currencies")(tools.get_supported_currencies) + self.mcp.tool("list_products")(tools.list_products) + + def run(self): + """Start the MCP server using http stream transport.""" + logger.info(f"Starting FastMCP Server on {self.host}:{self.port}") + self.mcp.run(transport="http", host=self.host, port=self.port)