Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions backend/agents/create_agent_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@
allow_memory_search: bool = True,
version_no: int = 0,
override_model_id: int | None = None,
conversation_id: int = None,
):
agent_info = search_agent_info_by_agent_id(
agent_id=agent_id, tenant_id=tenant_id, version_no=version_no)
Expand All @@ -331,13 +332,14 @@
allow_memory_search=allow_memory_search,
version_no=sub_agent_version_no,
override_model_id=None,
conversation_id=None,

Check failure on line 335 in backend/agents/create_agent_info.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change this argument; Function "create_agent_config" expects a different type

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8VGOWwJzk61Ijpp&open=AZ6wr8VGOWwJzk61Ijpp&pullRequest=3216
)
managed_agents.append(sub_agent_config)

# create external A2A agents (synchronous function, no await needed)
external_a2a_agents = _get_external_a2a_agents(agent_id, tenant_id, version_no)

tool_list = await create_tool_config_list(agent_id, tenant_id, user_id, version_no=version_no)
tool_list = await create_tool_config_list(agent_id, tenant_id, user_id, version_no=version_no, conversation_id=conversation_id)

# Build system prompt: prioritize segmented fields, fallback to original prompt field if not available
duty_prompt = agent_info.get("duty_prompt", "")
Expand Down Expand Up @@ -562,7 +564,7 @@
return agent_config


async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int = 0):
async def create_tool_config_list(agent_id, tenant_id, user_id, version_no: int = 0, conversation_id: int = None):

Check failure on line 567 in backend/agents/create_agent_info.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 40 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8VGOWwJzk61Ijpq&open=AZ6wr8VGOWwJzk61Ijpq&pullRequest=3216
# create tool
tool_config_list = []
langchain_tools = await discover_langchain_tools()
Expand Down Expand Up @@ -665,6 +667,17 @@
"storage_client": minio_client,
"validate_url_access": lambda urls: validate_urls_access(urls, user_id)
}
elif tool_config.class_name == "ScheduledTaskTool":
from database.scheduled_task_db import create_scheduled_task, query_tasks_by_agent, cancel_task
tool_config.metadata = {
"db_create": create_scheduled_task,
"db_list": query_tasks_by_agent,
"db_cancel": cancel_task,
"agent_id": agent_id,
"tenant_id": tenant_id,
"user_id": user_id,
"conversation_id": conversation_id,
}

tool_config_list.append(tool_config)

Expand Down Expand Up @@ -929,6 +942,7 @@
is_debug: bool = False,
override_version_no: int | None = None,
override_model_id: int | None = None,
conversation_id: int = None,
):
# Determine which version_no to use based on is_debug flag
# If is_debug=false, use the current published version (current_version_no)
Expand Down Expand Up @@ -957,6 +971,7 @@
"last_user_query": final_query,
"allow_memory_search": allow_memory_search,
"version_no": version_no,
"conversation_id": conversation_id,
}
if override_model_id is not None:
create_config_kwargs["override_model_id"] = override_model_id
Expand Down
51 changes: 51 additions & 0 deletions backend/apps/conversation_management_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Optional

from fastapi import APIRouter, Header, HTTPException, Request
from starlette.responses import JSONResponse

from consts.model import (
ConversationRequest,
Expand All @@ -18,6 +19,7 @@
generate_conversation_title_service,
get_conversation_history_service,
get_conversation_list_service,
get_new_messages_service,
get_sources_service,
rename_conversation_service,
update_message_opinion_service, get_message_id_by_index_impl,
Expand Down Expand Up @@ -240,3 +242,52 @@
except Exception as e:
logging.error(f"Failed to get message ID: {str(e)}")
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))


@router.get("/{conversation_id}/new_messages", response_model=Dict[str, Any])
async def check_new_messages_endpoint(conversation_id: int, since_index: int = 0, authorization: Optional[str] = Header(None)):

Check warning on line 248 in backend/apps/conversation_management_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "Annotated" type hints for FastAPI dependency injection

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8UYOWwJzk61Ijpk&open=AZ6wr8UYOWwJzk61Ijpk&pullRequest=3216
"""
Lightweight polling: check if new messages exist for a single conversation.

Args:
conversation_id: Conversation ID
since_index: Last known message index on the client side
authorization: Authorization header

Returns:
Dict with has_new, max_index, since_index
"""
try:
user_id, tenant_id = get_current_user_id(authorization)

Check warning on line 261 in backend/apps/conversation_management_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "tenant_id" with "_".

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8UYOWwJzk61Ijpj&open=AZ6wr8UYOWwJzk61Ijpj&pullRequest=3216
result = get_new_messages_service(conversation_id, user_id, since_index)
return JSONResponse(status_code=HTTPStatus.OK, content=result)
except Exception as e:
logging.error(f"Failed to check new messages: {str(e)}")

Check failure on line 265 in backend/apps/conversation_management_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "logging.exception()" instead.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8UYOWwJzk61Ijpl&open=AZ6wr8UYOWwJzk61Ijpl&pullRequest=3216
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))


@router.post("/batch_new_messages", response_model=Dict[str, Any])
async def batch_check_new_messages_endpoint(request: Dict[str, Any], authorization: Optional[str] = Header(None)):

Check warning on line 270 in backend/apps/conversation_management_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "Annotated" type hints for FastAPI dependency injection

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8UYOWwJzk61Ijpn&open=AZ6wr8UYOWwJzk61Ijpn&pullRequest=3216
"""
Batch check for new messages across multiple conversations.

Args:
request: Body with "checks" list of {"conversation_id": int, "since_index": int}
authorization: Authorization header

Returns:
Dict mapping conversation_id to {has_new, max_index, since_index}
"""
try:
user_id, tenant_id = get_current_user_id(authorization)

Check warning on line 282 in backend/apps/conversation_management_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace the unused local variable "tenant_id" with "_".

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8UYOWwJzk61Ijpm&open=AZ6wr8UYOWwJzk61Ijpm&pullRequest=3216
checks = request.get("checks", [])
results = {}
for check in checks:
cid = check.get("conversation_id")
since = check.get("since_index", 0)
if cid is not None:
results[str(cid)] = get_new_messages_service(cid, user_id, since)
return JSONResponse(status_code=HTTPStatus.OK, content={"results": results})
except Exception as e:
logging.error(f"Failed to batch check new messages: {str(e)}")

Check failure on line 292 in backend/apps/conversation_management_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "logging.exception()" instead.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ6wr8UYOWwJzk61Ijpo&open=AZ6wr8UYOWwJzk61Ijpo&pullRequest=3216
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
11 changes: 11 additions & 0 deletions backend/apps/runtime_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from apps.file_management_app import file_management_runtime_router as file_management_router
from apps.skill_app import skill_creator_router
from middleware.exception_handler import ExceptionHandlerMiddleware
from services.scheduled_task_scheduler import scheduled_task_scheduler

# Create logger instance
logger = logging.getLogger("runtime_app")
Expand All @@ -24,3 +25,13 @@
app.include_router(file_management_router)
app.include_router(voice_router)
app.include_router(skill_creator_router)


@app.on_event("startup")
async def start_scheduled_task_scheduler():
scheduled_task_scheduler.start()


@app.on_event("shutdown")
async def stop_scheduled_task_scheduler():
scheduled_task_scheduler.stop()
11 changes: 11 additions & 0 deletions backend/database/conversation_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,17 @@ def get_source_searches_by_conversation(conversation_id: int, user_id: Optional[
return [as_dict(record) for record in search_records]


def get_max_message_index(conversation_id: int) -> int:
"""Return the maximum message_index for a conversation, or -1 if empty."""
with get_db_session() as session:
conversation_id = int(conversation_id)
stmt = select(func.coalesce(func.max(ConversationMessage.message_index), -1)).where(
ConversationMessage.conversation_id == conversation_id,
ConversationMessage.delete_flag == 'N',
)
return session.execute(stmt).scalar()


def get_message(message_id: int, user_id: Optional[str] = None) -> Dict[str, Any]:
"""
Get message details by message ID
Expand Down
29 changes: 29 additions & 0 deletions backend/database/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,35 @@ class A2AMessage(SimpleTableBase):
timezone=False), server_default=func.now(), doc="Message creation timestamp")


class ScheduledTaskRecord(TableBase):
"""
Scheduled task records for deferred / recurring agent execution.
"""
__tablename__ = "scheduled_tasks_t"
__table_args__ = (
Index("ix_scheduled_task_status_next_fire", "status", "next_fire_time"),
Index("ix_scheduled_task_agent_delete", "agent_id", "delete_flag"),
{"schema": SCHEMA},
)

task_id = Column(Integer, Sequence("scheduled_tasks_t_task_id_seq", schema=SCHEMA),
primary_key=True, nullable=False, autoincrement=True, doc="Primary key")
task_uuid = Column(String(36), unique=True, nullable=False, doc="Unique task identifier (UUID)")
task_name = Column(String(200), doc="Human-readable task name")
task_prompt = Column(Text, nullable=False, doc="The prompt to execute when the task fires")
task_type = Column(String(10), nullable=False, doc="Task type: oneshot or cron")
cron_expression = Column(String(100), doc="Cron expression for recurring tasks")
delay_seconds = Column(Integer, doc="Delay in seconds for oneshot tasks")
status = Column(String(20), default="pending", doc="Task status: pending, fired, cancelled, error")
next_fire_time = Column(TIMESTAMP(timezone=False), doc="Next scheduled execution time")
fire_count = Column(Integer, default=0, doc="Number of times this task has fired")
max_fires = Column(Integer, nullable=True, doc="Maximum number of fires (NULL = unlimited)")
agent_id = Column(Integer, nullable=False, doc="Agent ID that owns this task")
conversation_id = Column(Integer, nullable=True, doc="Conversation ID associated with this task")
tenant_id = Column(String(100), nullable=False, doc="Tenant ID for multi-tenancy isolation")
user_id = Column(String(100), nullable=False, doc="User ID who created this task")


class A2AArtifact(SimpleTableBase):
"""
A2A artifacts. Stores the output/artifacts produced by a task.
Expand Down
82 changes: 82 additions & 0 deletions backend/database/scheduled_task_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
from datetime import datetime
from typing import Optional

from sqlalchemy import select, update

from .client import as_dict, get_db_session
from .db_models import ScheduledTaskRecord

logger = logging.getLogger("scheduled_task_db")


def create_scheduled_task(data: dict) -> dict:
"""Insert a new scheduled task record and return it as a dict."""
with get_db_session() as session:
record = ScheduledTaskRecord(**data)
session.add(record)
session.flush()
return as_dict(record)


def query_tasks_by_agent(agent_id: int, tenant_id: str, user_id: str = None) -> list[dict]:
"""Return pending tasks for a given agent and tenant, optionally filtered by user."""
with get_db_session() as session:
stmt = select(ScheduledTaskRecord).where(
ScheduledTaskRecord.agent_id == agent_id,
ScheduledTaskRecord.tenant_id == tenant_id,
ScheduledTaskRecord.status == "pending",
ScheduledTaskRecord.delete_flag == "N",
)
if user_id:
stmt = stmt.where(ScheduledTaskRecord.user_id == user_id)
stmt = stmt.order_by(ScheduledTaskRecord.task_id.desc())
records = session.scalars(stmt).all()
return [as_dict(r) for r in records]


def query_pending_tasks_due(now: datetime) -> list[dict]:
"""Return all pending tasks whose next_fire_time <= now (global, no tenant filter)."""
with get_db_session() as session:
stmt = select(ScheduledTaskRecord).where(
ScheduledTaskRecord.status == "pending",
ScheduledTaskRecord.next_fire_time <= now,
ScheduledTaskRecord.delete_flag == "N",
)
records = session.scalars(stmt).all()
return [as_dict(r) for r in records]


def cancel_task(task_uuid: str, agent_id: int, tenant_id: str, user_id: str = None) -> bool:
"""Soft-cancel a task. Optionally restrict to a specific user for isolation."""
with get_db_session() as session:
conditions = [
ScheduledTaskRecord.task_uuid == task_uuid,
ScheduledTaskRecord.agent_id == agent_id,
ScheduledTaskRecord.tenant_id == tenant_id,
ScheduledTaskRecord.delete_flag == "N",
ScheduledTaskRecord.status == "pending",
]
if user_id:
conditions.append(ScheduledTaskRecord.user_id == user_id)
stmt = (
update(ScheduledTaskRecord)
.where(*conditions)
.values(status="cancelled")
)
result = session.execute(stmt)
return result.rowcount > 0


def update_task_status(task_uuid: str, updates: dict) -> None:
"""Update arbitrary columns on a task record identified by task_uuid."""
with get_db_session() as session:
stmt = (
update(ScheduledTaskRecord)
.where(
ScheduledTaskRecord.task_uuid == task_uuid,
ScheduledTaskRecord.delete_flag == "N",
)
.values(**updates)
)
session.execute(stmt)
11 changes: 11 additions & 0 deletions backend/services/conversation_management_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ def delete_conversation_service(conversation_id: int, user_id: str) -> bool:
raise Exception(str(e))


def get_new_messages_service(conversation_id: int, user_id: str, since_index: int) -> Dict[str, Any]:
"""Lightweight polling: check for new messages after since_index."""
from database.conversation_db import get_conversation, get_max_message_index
conv = get_conversation(conversation_id, user_id)
if conv is None:
return {"has_new": False, "max_index": since_index, "since_index": since_index}
max_idx = get_max_message_index(conversation_id)
has_new = max_idx > since_index
return {"has_new": has_new, "max_index": max_idx, "since_index": since_index}


def get_conversation_history_service(conversation_id: int, user_id: str) -> List[Dict[str, Any]]:
"""
Get complete history of specified conversation
Expand Down
Loading