From 08e6533aef98f41386eb9947bacfd96765e22f0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E8=A1=8D=E5=8D=8E?= Date: Mon, 8 Dec 2025 19:29:57 +0800 Subject: [PATCH 1/3] add dynamic workers num and lock --- oxygent/oxy/function_tools/function_hub.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/oxygent/oxy/function_tools/function_hub.py b/oxygent/oxy/function_tools/function_hub.py index 7328a7a3..4236eb01 100644 --- a/oxygent/oxy/function_tools/function_hub.py +++ b/oxygent/oxy/function_tools/function_hub.py @@ -8,6 +8,8 @@ import asyncio import functools import concurrent.futures +import os +import threading from pydantic import Field @@ -34,12 +36,17 @@ def __init__(self, **data): """Initialize the FunctionHub with thread pool support.""" super().__init__(**data) self._thread_pool = None # Private attribute for thread pool + self._thread_pool_lock = threading.Lock() # Lock for thread pool initialization @property def thread_pool(self): - """Lazy initialization of thread pool.""" + """Lazy initialization of thread pool with thread safety.""" if self._thread_pool is None: - self._thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4) + with self._thread_pool_lock: + if self._thread_pool is None: # Double-checked locking pattern + cpu_count = os.cpu_count() or 1 + max_workers = min(max(cpu_count * 3, 4), 32) + self._thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) return self._thread_pool async def init(self): From 719cc3d4bf82d72ed8e47dfbb78f20e024949308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E8=A1=8D=E5=8D=8E?= Date: Mon, 8 Dec 2025 20:27:24 +0800 Subject: [PATCH 2/3] add log and cleanup smoothly --- oxygent/oxy/function_tools/function_hub.py | 75 +++++++++++++++++++++- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/oxygent/oxy/function_tools/function_hub.py b/oxygent/oxy/function_tools/function_hub.py index 4236eb01..298022ce 100644 --- a/oxygent/oxy/function_tools/function_hub.py +++ b/oxygent/oxy/function_tools/function_hub.py @@ -10,12 +10,15 @@ import concurrent.futures import os import threading +import logging from pydantic import Field from ..base_tool import BaseTool from .function_tool import FunctionTool +logger = logging.getLogger(__name__) + class FunctionHub(BaseTool): """Central hub for registering and managing Python functions as tools. @@ -115,7 +118,73 @@ async def async_func(*args, **kwargs): return decorator async def cleanup(self): - """Clean up resources, including the thread pool.""" + """Clean up resources, including the thread pool. + + This method ensures proper shutdown of the thread pool to prevent + resource leaks and dangling threads. It waits for all pending tasks + to complete before shutting down. + + The cleanup process is idempotent - multiple calls are safe. + """ if self._thread_pool: - self._thread_pool.shutdown(wait=True) - self._thread_pool = None + try: + logger.info(f"FunctionHub {self.name}: Starting thread pool cleanup...") + # Wait for all pending tasks to complete + self._thread_pool.shutdown(wait=True) + logger.info(f"FunctionHub {self.name}: Thread pool shutdown completed") + except Exception as e: + logger.error(f"FunctionHub {self.name}: Error during thread pool cleanup: {e}") + # Even if shutdown fails, ensure _thread_pool is set to None + # to prevent further usage and potential memory leaks + finally: + self._thread_pool = None + + def is_thread_pool_active(self): + """Check if the thread pool is currently active. + + Returns: + bool: True if thread pool is initialized and active, False otherwise. + """ + return self._thread_pool is not None + + def get_thread_pool_info(self): + """Get information about the current thread pool. + + Returns: + dict: Thread pool information including worker count and status, + or None if pool is not initialized. + """ + if self._thread_pool is None: + return None + + try: + # ThreadPoolExecutor doesn't expose internal state directly, + # but we can check if it's shut down + return { + "initialized": True, + "workers": getattr(self._thread_pool, '_max_workers', 'unknown'), + "shutdown": getattr(self._thread_pool, '_shutdown', False) + } + except Exception: + return {"initialized": True, "status": "active"} + + async def __aenter__(self): + """Async context manager entry point. + + Returns: + FunctionHub: Self for use in async with statement + """ + await self.init() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit point. + + Ensures cleanup is performed even if exceptions occur during usage. + + Args: + exc_type: Exception type if an exception occurred + exc_val: Exception value if an exception occurred + exc_tb: Exception traceback if an exception occurred + """ + await self.cleanup() From 5435ed85cf65b2f2c0c5c21a13f7f5795f3443ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E8=A1=8D=E5=8D=8E?= Date: Thu, 18 Dec 2025 13:16:59 +0800 Subject: [PATCH 3/3] feat: update local redis implementation and add unit test --- oxygent/databases/db_redis/local_redis.py | 172 ++++++++++++- test/unittest/test_local_redis.py | 294 +++++++++++++++++++++- 2 files changed, 463 insertions(+), 3 deletions(-) diff --git a/oxygent/databases/db_redis/local_redis.py b/oxygent/databases/db_redis/local_redis.py index dc644eae..02f77684 100644 --- a/oxygent/databases/db_redis/local_redis.py +++ b/oxygent/databases/db_redis/local_redis.py @@ -9,7 +9,8 @@ import json import time from collections import deque -from typing import Dict, Optional, Union +from typing import Dict, Optional, Union, List, Tuple +import bisect from ...config import Config @@ -26,10 +27,12 @@ class LocalRedis: - Automatic expiration handling with TTL support - List operations with configurable size limits - Value type validation and conversion + - Sorted set (zset) operations with score-based ordering """ def __init__(self, *, yield_on_ops: bool = True): self.data: Dict[str, deque] = {} + self.zsets: Dict[str, Tuple[List[float], List[str]]] = {} # (scores, members) self.expiry: Dict[str, float] = {} self.default_expire_time = Config.get_redis_expire_time() self.default_list_max_size = Config.get_redis_max_size() @@ -143,9 +146,174 @@ def _check_expiry(self, key: str): key: The key to check for expiration """ if key in self.expiry and time.time() > self.expiry[key]: - del self.data[key] + if key in self.data: + del self.data[key] + if key in self.zsets: + del self.zsets[key] del self.expiry[key] + async def zadd(self, key: str, mapping: Dict[str, float], ex: Optional[int] = None) -> int: + """Add one or more members to a sorted set, or update their score if they already exist. + + Args: + key: The sorted set key + mapping: Dictionary of member-score pairs + ex: Expiration time in seconds (default: 1 day) + + Returns: + int: Number of new members added (excluding updated scores) + """ + if ex is None: + ex = self.default_expire_time + + if key not in self.zsets: + self.zsets[key] = ([], []) + self.expiry[key] = time.time() + ex + + scores, members = self.zsets[key] + added = 0 + + for member, score in mapping.items(): + # Check if member exists + if member in members: + idx = members.index(member) + # Always update: remove old position and re-insert at new position + del scores[idx] + del members[idx] + insert_pos = bisect.bisect_left(scores, score) + scores.insert(insert_pos, score) + members.insert(insert_pos, member) + else: + # Add new member + insert_pos = bisect.bisect_left(scores, score) + scores.insert(insert_pos, score) + members.insert(insert_pos, member) + added += 1 + + self.expiry[key] = time.time() + ex + + if self._yield_on_ops: + await asyncio.sleep(0) + return added + + async def zrange( + self, + key: str, + start: int, + stop: int, + withscores: bool = False + ) -> Union[List[str], List[Tuple[str, float]]]: + """Return a range of members in a sorted set, by index. + + Args: + key: The sorted set key + start: Starting index (0-based) + stop: Ending index (inclusive, -1 for last) + withscores: Whether to return scores with members + + Returns: + List of members or list of (member, score) tuples + """ + self._check_expiry(key) + if key not in self.zsets: + return [] + + scores, members = self.zsets[key] + # Handle negative indices + if stop < 0: + stop = len(members) + stop + if start < 0: + start = len(members) + start + + result = members[start:stop+1] + if withscores: + result_scores = scores[start:stop+1] + return list(zip(result, result_scores)) + return result + + async def zrem(self, key: str, *members: str) -> int: + """Remove one or more members from a sorted set. + + Args: + key: The sorted set key + *members: Members to remove + + Returns: + int: Number of members removed + """ + self._check_expiry(key) + if key not in self.zsets: + return 0 + + scores, members_list = self.zsets[key] + removed = 0 + + for member in members: + if member in members_list: + idx = members_list.index(member) + del scores[idx] + del members_list[idx] + removed += 1 + + if not members_list: # If set is empty, remove it + del self.zsets[key] + del self.expiry[key] + + if self._yield_on_ops: + await asyncio.sleep(0) + return removed + + async def zincrby(self, key: str, increment: float, member: str, ex: Optional[int] = None) -> float: + """Increment the score of a member in a sorted set. + + Args: + key: The sorted set key + increment: The amount to increment the score by + member: The member to increment + ex: Expiration time in seconds (default: 1 day) + + Returns: + float: The new score of the member + """ + if ex is None: + ex = self.default_expire_time + + self._check_expiry(key) + + # Initialize the sorted set if it doesn't exist + if key not in self.zsets: + self.zsets[key] = ([], []) + self.expiry[key] = time.time() + ex + else: + # Update expiry even if key exists + self.expiry[key] = time.time() + ex + + scores, members_list = self.zsets[key] + + # Check if member exists + if member in members_list: + idx = members_list.index(member) + old_score = scores[idx] + new_score = old_score + increment + + # Always remove and re-insert to maintain sorted order + del scores[idx] + del members_list[idx] + insert_pos = bisect.bisect_left(scores, new_score) + scores.insert(insert_pos, new_score) + members_list.insert(insert_pos, member) + else: + # Member doesn't exist, add it with the increment as score + new_score = increment + insert_pos = bisect.bisect_left(scores, new_score) + scores.insert(insert_pos, new_score) + members_list.insert(insert_pos, member) + + if self._yield_on_ops: + await asyncio.sleep(0) + + return new_score + async def close(self): # This method is async to maintain compatibility with the Redis interface # Async for interface compatibility diff --git a/test/unittest/test_local_redis.py b/test/unittest/test_local_redis.py index ef007c01..e9d79951 100644 --- a/test/unittest/test_local_redis.py +++ b/test/unittest/test_local_redis.py @@ -3,7 +3,7 @@ """ import time - +import asyncio import pytest from oxygent.databases.db_redis.local_redis import LocalRedis @@ -77,3 +77,295 @@ async def test_expiry(redis): @pytest.mark.asyncio async def test_close(redis): assert await redis.close() is None + +@pytest.fixture +def redis(): + """Create a LocalRedis instance for each test.""" + return LocalRedis(yield_on_ops=True) + + +@pytest.mark.asyncio +async def test_zadd_new_members(redis): + """Test adding new members to a sorted set.""" + result = await redis.zadd("myzset", {"one": 1, "two": 2, "three": 3}) + assert result == 3 # 3 new members added + + # Verify members are in correct order + members = await redis.zrange("myzset", 0, -1) + assert members == ["one", "two", "three"] + + +@pytest.mark.asyncio +async def test_zadd_update_existing_members(redis): + """Test updating existing members in a sorted set.""" + # Add initial members + await redis.zadd("myzset", {"one": 1, "two": 2, "three": 3}) + + # Update a member's score + result = await redis.zadd("myzset", {"two": 5}) + assert result == 0 # No new members added, only updated + + # Verify order is updated + members = await redis.zrange("myzset", 0, -1) + assert members == ["one", "three", "two"] + + +@pytest.mark.asyncio +async def test_zadd_with_withscores(redis): + """Test zadd with scores returned.""" + await redis.zadd("myzset", {"a": 10.5, "b": 20.3, "c": 5.1}) + + result = await redis.zrange("myzset", 0, -1, withscores=True) + assert result == [("c", 5.1), ("a", 10.5), ("b", 20.3)] + + +@pytest.mark.asyncio +async def test_zrange_with_indices(redis): + """Test zrange with different index ranges.""" + await redis.zadd("myzset", {"a": 1, "b": 2, "c": 3, "d": 4, "e": 5}) + + # Range from 0 to 2 + result = await redis.zrange("myzset", 0, 2) + assert result == ["a", "b", "c"] + + # Range with negative indices + result = await redis.zrange("myzset", -2, -1) + assert result == ["d", "e"] + + # Single element + result = await redis.zrange("myzset", 0, 0) + assert result == ["a"] + + +@pytest.mark.asyncio +async def test_zrange_nonexistent_key(redis): + """Test zrange on non-existent key returns empty list.""" + result = await redis.zrange("nonexistent", 0, -1) + assert result == [] + + +@pytest.mark.asyncio +async def test_zrem_single_member(redis): + """Test removing a single member from sorted set.""" + await redis.zadd("myzset", {"a": 1, "b": 2, "c": 3}) + + removed = await redis.zrem("myzset", "b") + assert removed == 1 + + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "c"] + + +@pytest.mark.asyncio +async def test_zrem_multiple_members(redis): + """Test removing multiple members from sorted set.""" + await redis.zadd("myzset", {"a": 1, "b": 2, "c": 3, "d": 4}) + + removed = await redis.zrem("myzset", "b", "d") + assert removed == 2 + + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "c"] + + +@pytest.mark.asyncio +async def test_zrem_nonexistent_member(redis): + """Test removing non-existent member.""" + await redis.zadd("myzset", {"a": 1, "b": 2}) + + removed = await redis.zrem("myzset", "nonexistent") + assert removed == 0 + + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "b"] + + +@pytest.mark.asyncio +async def test_zrem_cleans_empty_set(redis): + """Test that zrem removes empty sets from storage.""" + await redis.zadd("myzset", {"a": 1}) + + await redis.zrem("myzset", "a") + + # Verify key is completely removed + assert "myzset" not in redis.zsets + assert "myzset" not in redis.expiry + + +@pytest.mark.asyncio +async def test_zincrby_existing_member(redis): + """Test incrementing score of existing member.""" + await redis.zadd("myzset", {"a": 10, "b": 20, "c": 30}) + + new_score = await redis.zincrby("myzset", 5, "b") + assert new_score == 25 + + # Verify order is updated: b's score changes from 20 to 25 + # So order should be a(10), b(25), c(30) + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "b", "c"] + + +@pytest.mark.asyncio +async def test_zincrby_new_member(redis): + """Test incrementing score of non-existent member (creates it).""" + await redis.zadd("myzset", {"a": 10}) + + new_score = await redis.zincrby("myzset", 15, "b") + assert new_score == 15 + + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "b"] + + +@pytest.mark.asyncio +async def test_zincrby_negative_increment(redis): + """Test decrementing score with negative increment.""" + await redis.zadd("myzset", {"a": 10, "b": 20, "c": 30}) + + new_score = await redis.zincrby("myzset", -5, "b") + assert new_score == 15 + + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "b", "c"] + + +@pytest.mark.asyncio +async def test_zincrby_on_empty_set(redis): + """Test zincrby on non-existent key creates the set.""" + new_score = await redis.zincrby("myzset", 42, "member") + assert new_score == 42 + + members = await redis.zrange("myzset", 0, -1, withscores=True) + assert members == [("member", 42)] + + +@pytest.mark.asyncio +async def test_zincrby_maintains_order(redis): + """Test that zincrby properly maintains sorted order.""" + await redis.zadd("myzset", {"a": 1, "b": 2, "c": 3, "d": 4}) + + # Move 'a' to become the largest + await redis.zincrby("myzset", 10, "a") + + members = await redis.zrange("myzset", 0, -1, withscores=True) + assert members == [("b", 2), ("c", 3), ("d", 4), ("a", 11)] + + +@pytest.mark.asyncio +async def test_zincrby_multiple_operations(redis): + """Test multiple zincrby operations in sequence.""" + await redis.zadd("myzset", {"a": 0, "b": 0}) + + # Simulate API key pool: increment on task start, decrement on task end + await redis.zincrby("myzset", 1, "a") # a: 1, b: 0 + assert (await redis.zrange("myzset", 0, 0))[0] == "b" # b is least loaded + + await redis.zincrby("myzset", 2, "b") # a: 1, b: 2 + assert (await redis.zrange("myzset", 0, 0))[0] == "a" # a is least loaded + + await redis.zincrby("myzset", -1, "a") # a: 0, b: 2 + assert (await redis.zrange("myzset", 0, 0))[0] == "a" # a is least loaded + + +@pytest.mark.asyncio +async def test_api_key_pool_simulation(redis): + """Test a realistic API key pool load balancing scenario.""" + # Initialize pool with 3 API keys + api_keys = {"key1", "key2", "key3"} + await redis.zadd("api_pool", {key: 0 for key in api_keys}) + + # Simulate task assignments + # Get least loaded key + least_loaded = (await redis.zrange("api_pool", 0, 0))[0] + assert least_loaded in api_keys + + # Increment when task starts + await redis.zincrby("api_pool", 1, least_loaded) + + # Get new least loaded key + new_least = (await redis.zrange("api_pool", 0, 0))[0] + assert new_least in api_keys + assert new_least != least_loaded # Should be different + + # Increment again + await redis.zincrby("api_pool", 1, new_least) + + # Get state with scores + state = await redis.zrange("api_pool", 0, -1, withscores=True) + assert len(state) == 3 + + # Verify scores are correct + scores = {member: score for member, score in state} + assert scores[least_loaded] == 1 + assert scores[new_least] == 1 + + +@pytest.mark.asyncio +async def test_expiry_in_zadd(redis): + """Test expiry setting in zadd.""" + import time + + await redis.zadd("myzset", {"a": 1}, ex=1) + + # Should exist + members = await redis.zrange("myzset", 0, -1) + assert len(members) == 1 + + # Wait for expiry + await asyncio.sleep(1.1) + + # Should be expired, get empty result + members = await redis.zrange("myzset", 0, -1) + assert len(members) == 0 + +@pytest.mark.asyncio +async def test_negative_scores(redis): + """Test sorted set with negative scores.""" + await redis.zadd("myzset", {"a": -10, "b": 0, "c": 10, "d": -5}) + + members = await redis.zrange("myzset", 0, -1, withscores=True) + assert members == [("a", -10), ("d", -5), ("b", 0), ("c", 10)] + + +@pytest.mark.asyncio +async def test_float_scores(redis): + """Test sorted set with floating point scores.""" + await redis.zadd("myzset", { + "a": 1.5, + "b": 2.7, + "c": 1.2, + "d": 3.1 + }) + + members = await redis.zrange("myzset", 0, -1, withscores=True) + assert members == [("c", 1.2), ("a", 1.5), ("b", 2.7), ("d", 3.1)] + + +@pytest.mark.asyncio +async def test_score_equality_update(redis): + """Test that updating with same score doesn't change order.""" + await redis.zadd("myzset", {"a": 1, "b": 2, "c": 3}) + + # Update 'b' with same score + await redis.zadd("myzset", {"b": 2}) + + members = await redis.zrange("myzset", 0, -1) + assert members == ["a", "b", "c"] + + +@pytest.mark.asyncio +async def test_large_sorted_set(redis): + """Test performance with larger sorted set.""" + # Add 1000 members + members_dict = {f"member_{i}": float(i) for i in range(1000)} + added = await redis.zadd("large_set", members_dict) + assert added == 1000 + + # Get all members + all_members = await redis.zrange("large_set", 0, -1) + assert len(all_members) == 1000 + + # Verify they're sorted + for i, member in enumerate(all_members): + assert member == f"member_{i}"