Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ak-py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ gmail = [
"google-auth-httplib2>=0.2.0",
"google-api-python-client>=2.0.0",
]
livekit = [
"livekit-agents>=1.0.0",
"livekit-plugins-openai>=1.0.0",
"livekit-plugins-deepgram>=1.0.0",
"livekit-plugins-silero>=1.0.0",
"livekit-api>=0.2.0",
]
test = [
"pytest>=8.4.1",
"pytest-asyncio>=1.2.0",
Expand Down
12 changes: 12 additions & 0 deletions ak-py/src/agentkernel/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ class _GmailConfig(BaseModel):
label_filter: str = Field(default="INBOX", description="Gmail label to monitor (e.g., INBOX, UNREAD)")


class _LiveKitConfig(BaseModel):
agent: str = Field(default="", description="Default agent to use for LiveKit Voice interactions")
url: str = Field(default="", description="LiveKit WebSocket URL (e.g., wss://my-project.livekit.cloud)")
api_key: str = Field(default="", description="LiveKit API Key")
api_secret: str = Field(default="", description="LiveKit API Secret")
stt_provider: str = Field(default="deepgram", pattern="^(deepgram|openai)$", description="Speech-to-Text provider (deepgram or openai)")
tts_provider: str = Field(
default="openai", pattern="^(openai|elevenlabs|google)$", description="Text-to-Speech provider (openai, elevenlabs, or google)"
)
Comment thread
pulinduvidmal marked this conversation as resolved.


class _MultimodalStorageRedisConfig(BaseModel):
url: str = Field(default="redis://localhost:6379", description="Redis connection URL")
ttl: int = Field(default=604800, description="Attachment TTL in seconds")
Expand Down Expand Up @@ -281,6 +292,7 @@ class AKConfig(YamlBaseSettingsModified):
instagram: _InstagramConfig = Field(description="Instagram Business API related configurations", default_factory=_InstagramConfig)
telegram: _TelegramConfig = Field(description="Telegram Bot related configurations", default_factory=_TelegramConfig)
gmail: _GmailConfig = Field(description="Gmail related configurations", default_factory=_GmailConfig)
livekit: _LiveKitConfig = Field(description="LiveKit Voice related configurations", default_factory=_LiveKitConfig)
multimodal: _MultimodalConfig = Field(description="Multimodal attachment memory configurations", default_factory=_MultimodalConfig)

trace: _TraceConfig = Field(description="Tracing related configurations", default_factory=_TraceConfig)
Expand Down
82 changes: 82 additions & 0 deletions ak-py/src/agentkernel/integration/livekit/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# LiveKit Voice Integration

Agent Kernel supports real-time, ultra-low latency voice integrations via [LiveKit](https://livekit.io/).

By treating LiveKit as an **Integration**, you can build an agent once (using CrewAI, LangGraph, OpenAI, etc.), equip it with tools and memory via Agent Kernel, and then use LiveKit to allow users to **talk to your agent over a real-time voice call**.

LiveKit handles the WebRTC voice connection, Speech-to-Text (STT), and Text-to-Speech (TTS), while Agent Kernel handles the intelligence, routing, and tools.

## Architecture

When you use the LiveKit integration:
1. The user speaks into their microphone via a LiveKit frontend.
2. LiveKit's **Speech-to-Text (STT)** plugin transcribes the voice into text.
3. The transcribed text is intercepted by our custom `AgentKernelLLM` bridge.
4. The bridge forwards the text to **Agent Kernel** (`AgentService().run(text)`).
5. Agent Kernel's selected agent processes the text and generates a response.
6. The response is sent back to LiveKit's **Text-to-Speech (TTS)** plugin.
7. The TTS plugin synthesizes the voice and streams it back to the user.

## Setup

First, ensure you have installed the LiveKit optional dependencies:

```bash
pip install "agentkernel[livekit]"
```

You will also need:
1. A free account on [LiveKit Cloud](https://cloud.livekit.io/).
2. Your LiveKit API keys (`LIVEKIT_URL`, `LIVEKIT_API_KEY`, `LIVEKIT_API_SECRET`).
3. API keys for your preferred STT/TTS providers (e.g., `OPENAI_API_KEY`, `DEEPGRAM_API_KEY`, etc.).
Comment thread
pulinduvidmal marked this conversation as resolved.

Comment thread
pulinduvidmal marked this conversation as resolved.
## Configuration

In your `config.yaml`, configure which Agent Kernel agent should respond to LiveKit voice interactions, as well as your preferred STT and TTS providers:

```yaml
livekit:
agent: "my-voice-agent"
stt_provider: "deepgram" # Options: deepgram, openai
tts_provider: "openai" # Options: openai, elevenlabs, google
url: "wss://your-project-id.livekit.cloud" # Optional, can use LIVEKIT_URL env var
api_key: "your_api_key" # Optional, can use LIVEKIT_API_KEY env var
api_secret: "your_api_secret" # Optional, can use LIVEKIT_API_SECRET env var
```

You can also set these via environment variables:
```bash
export AK_LIVEKIT__AGENT="my-voice-agent"
export AK_LIVEKIT__STT_PROVIDER="openai"
```

## Example Usage

Create a Python script (e.g., `server.py`) that initializes your Agent Kernel agent and starts the REST API. The `AgentLiveKitRequestHandler` will automatically launch the LiveKit background worker alongside your FastAPI server.

```python
import os
import logging
from agentkernel.api import RESTAPI
from agentkernel.openai import OpenAIModule
from agentkernel.livekit import AgentLiveKitRequestHandler
from agents import Agent as OpenAIAgent

logging.basicConfig(level=logging.INFO)

# 1. Define your Agent Kernel Agent
voice_agent = OpenAIAgent(
name="my-voice-agent",
handoff_description="Agent for voice interactions",
instructions="You are a concise voice assistant. Do not use markdown or emojis.",
)

# 2. Register the agent with Agent Kernel
OpenAIModule([voice_agent])

# 3. Start the server with the LiveKit Handler
if __name__ == "__main__":
RESTAPI.run([AgentLiveKitRequestHandler()])
```

> **Note:** The `AgentLiveKitRequestHandler` exposes a `/livekit/token` API endpoint on your FastAPI server. Your frontend (e.g., a React application or the LiveKit Agent Console) can hit this endpoint to generate secure access tokens for users joining the voice room.
16 changes: 16 additions & 0 deletions ak-py/src/agentkernel/integration/livekit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Agent Kernel Integrations with LiveKit

This package contains the Agent Kernel integration implementations for LiveKit Voice Agents.
"""

import importlib.metadata

try:
__version__ = importlib.metadata.version("agentkernel")
except importlib.metadata.PackageNotFoundError:
__version__ = "0.1.0"

from .livekit_handler import AgentKernelLLM, AgentLiveKitRequestHandler

__all__ = ["AgentLiveKitRequestHandler", "AgentKernelLLM"]
246 changes: 246 additions & 0 deletions ak-py/src/agentkernel/integration/livekit/livekit_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import asyncio
import logging
from typing import Callable

from fastapi import APIRouter, HTTPException
from livekit import agents, api
from livekit.agents import AgentServer, JobContext, WorkerOptions, WorkerType, llm
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS, NOT_GIVEN, APIConnectOptions, NotGivenOr
from livekit.agents.voice import Agent as VoicePipelineAgent
from livekit.agents.voice import AgentSession
from livekit.plugins import deepgram, openai, silero

from ...api import RESTRequestHandler
from ...core import AgentService, Config
from ...core.model import AgentReplyText
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated

logger = logging.getLogger("ak.integration.livekit")


class AgentKernelLLMStream(llm.LLMStream):
"""
A simple LLMStream implementation that yields a single response string.
LiveKit's TTS engine consumes this stream and synthesizes it into speech.
"""

def __init__(self, parent_llm: llm.LLM, text: str, chat_ctx: llm.ChatContext):
super().__init__(parent_llm, chat_ctx=chat_ctx, tools=[], conn_options=DEFAULT_API_CONNECT_OPTIONS)
self._text = text

async def _run(self) -> None:
pass

async def __anext__(self):
if self._text is None:
raise StopAsyncIteration

text = self._text
self._text = None

return llm.ChatChunk(id="agent_kernel_chunk", delta=llm.ChoiceDelta(content=text, role="assistant"))


class AgentKernelLLMStreamWrapper(llm.LLMStream):
"""
An asynchronous generator wrapper that bridges LiveKit's streaming interface
with Agent Kernel's request-response architecture. It awaits the Agent Kernel
response and yields it to the LiveKit voice pipeline.
"""

def __init__(self, parent_llm: llm.LLM, chat_ctx: llm.ChatContext, agent_name: str, session_id: str, user_message: str):
super().__init__(parent_llm, chat_ctx=chat_ctx, tools=[], conn_options=DEFAULT_API_CONNECT_OPTIONS)
self.agent_name = agent_name
self.session_id = session_id
self.user_message = user_message
self._service = AgentService()
self._fetched = False

async def _run(self) -> None:
pass

async def __anext__(self):
if self._fetched:
raise StopAsyncIteration

self._fetched = True

# Select the agent and session
self._service.select(name=self.agent_name, session_id=self.session_id)

if not self._service.agent:
return llm.ChatChunk(
id="agent_kernel_chunk", delta=llm.ChoiceDelta(content="Error: No agent available to handle this request.", role="assistant")
)

try:
# Run the agent
reply = await self._service.run(self.user_message)
response_text = str(reply)
except Exception as e:
logger.error(f"Agent Kernel error: {e}", exc_info=True)
response_text = "I'm sorry, I encountered an internal error while processing your request."

return llm.ChatChunk(id="agent_kernel_chunk", delta=llm.ChoiceDelta(content=response_text, role="assistant"))


class AgentKernelLLM(llm.LLM):
"""
A custom LiveKit LLM implementation that intercepts user speech (transcribed to text)
and routes it to the Agent Kernel runtime, bypassing direct LLM API calls.
"""

def __init__(self, agent_name: str, session_id: str):
super().__init__()
self.agent_name = agent_name
self.session_id = session_id
self._service = AgentService()
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated

def chat(
self,
*,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool] | None = None,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict] = NOT_GIVEN,
) -> "AgentKernelLLMStreamWrapper | AgentKernelLLMStream":
"""
Called by LiveKit's AgentSession when the user has finished speaking and
the STT has finalized the transcript.
"""
# Find the last user message
user_message = ""
for msg in reversed(chat_ctx.messages()):
if msg.role == "user" and msg.text_content:
user_message = msg.text_content
break

if not user_message:
return AgentKernelLLMStream(self, "I did not hear anything.", chat_ctx)

logger.debug(f"Received transcribed user speech: {user_message}")

return AgentKernelLLMStreamWrapper(
parent_llm=self,
chat_ctx=chat_ctx,
agent_name=self.agent_name,
session_id=self.session_id,
user_message=user_message,
)


async def _default_entrypoint(ctx: JobContext):
"""
Default entrypoint for the LiveKit worker.
Initializes an AgentSession using Deepgram (STT), OpenAI (TTS), and AgentKernel (LLM).
"""
logger.info(f"Connecting to room {ctx.room.name}")
vad = silero.VAD.load()

agent_name = Config.get().livekit.agent
if not agent_name:
logger.warning("No agent configured for LiveKit interactions. Set 'agent' under 'livekit' in config.yaml.")

stt_provider = Config.get().livekit.stt_provider
if stt_provider == "openai":
from livekit.plugins import openai as lk_openai

stt_plugin = lk_openai.STT()
else:
stt_plugin = deepgram.STT()

tts_provider = Config.get().livekit.tts_provider
if tts_provider == "elevenlabs":
from livekit.plugins import elevenlabs

tts_plugin = elevenlabs.TTS()
elif tts_provider == "google":
from livekit.plugins import google

tts_plugin = google.TTS()
else:
tts_plugin = openai.TTS()

agent = VoicePipelineAgent(
vad=vad,
stt=stt_plugin,
llm=AgentKernelLLM(agent_name=agent_name, session_id=ctx.room.name),
tts=tts_plugin,
instructions="You are a helpful voice assistant.",
)

await ctx.connect(auto_subscribe=agents.AutoSubscribe.AUDIO_ONLY)

session = AgentSession()
await session.start(agent, room=ctx.room)


class AgentLiveKitRequestHandler(RESTRequestHandler):
"""
RESTRequestHandler for LiveKit Integration.

This handler achieves two things:
1. Exposes a REST API endpoint (`/livekit/token`) to generate secure access tokens for frontend clients.
2. Runs a LiveKit Worker as a background asyncio task seamlessly tied to the FastAPI lifecycle.
"""

def __init__(self, entrypoint_fnc: Callable[[JobContext], None] = None):
self._log = logging.getLogger("ak.api.livekit")
self._entrypoint = entrypoint_fnc or _default_entrypoint
self._worker_task = None
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated

# Pull config
self.url = Config.get().livekit.url
self.api_key = Config.get().livekit.api_key
self.api_secret = Config.get().livekit.api_secret

def get_router(self) -> APIRouter:
router = APIRouter(prefix="/livekit", tags=["LiveKit Integration"])

@router.on_event("startup")
async def startup_event():
if getattr(self, "_worker_started", False):
return
self._worker_started = True
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated

self._log.info("Starting up LiveKit Background Worker")

# Setup worker options
kwargs = {}
if self.url:
kwargs["ws_url"] = self.url
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_secret:
kwargs["api_secret"] = self.api_secret

# Set port to 0 to assign a random port, avoiding conflicts on 8081
kwargs["port"] = 0

# We initialize the AgentServer and start it as an asyncio task
worker_opts = WorkerOptions(agent_name="agent-kernel-worker", entrypoint_fnc=self._entrypoint, worker_type=WorkerType.ROOM, **kwargs)

server = AgentServer.from_server_options(worker_opts)
self._worker_task = asyncio.create_task(server.run())

Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated
@router.get("/token")
def get_token(room: str, identity: str):
"""
Generates a secure LiveKit Access Token for a frontend client to join the voice room.
"""
if not self.api_key or not self.api_secret:
raise HTTPException(status_code=500, detail="LiveKit API Key or Secret not configured in config.yaml under 'livekit'")
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated

Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated
token = api.AccessToken(self.api_key, self.api_secret)
token.with_identity(identity)
token.with_name(identity)
token.with_grants(
api.VideoGrants(
room_join=True,
room=room,
)
)
return {"token": token.to_jwt()}
Comment thread
pulinduvidmal marked this conversation as resolved.
Outdated

return router
8 changes: 8 additions & 0 deletions ak-py/src/agentkernel/livekit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import importlib.metadata

try:
__version__ = importlib.metadata.version("agentkernel")
except importlib.metadata.PackageNotFoundError:
__version__ = "0.1.0"

from .integration.livekit import *
Loading
Loading