diff --git a/_conf_schema.json b/_conf_schema.json index 95335cf..bee6816 100644 --- a/_conf_schema.json +++ b/_conf_schema.json @@ -2,14 +2,31 @@ "api_url": { "description": "MetingAPI 地址", "type": "string", - "hint": "不带 api? 及后面的后缀,例如:https://api.example.com/meting" + "hint": "不带后面的后缀,例如:https://api.example.com/meting" }, "default_source": { "description": "默认音源", "type": "string", - "hint": "可选:tencent(QQ音乐)、netease(网易云)、kugou(酷狗)、kuwo(酷我)", + "hint": "可选:tencent(QQ音乐)、netease(网易云)", "default": "netease" }, + "api_type": { + "description": "API 类型", + "type": "int", + "hint": "1: Node API , 2: PHP API", + "default": 1 + }, + "use_music_card": { + "description": "使用音乐卡片", + "type": "bool", + "hint": "是否使用音乐卡片显示搜索结果,默认为 true", + "default": true + }, + "api_sign_url": { + "description": "音乐卡片签名地址", + "type": "string", + "hint": "用于获取 API 请求签名的地址" + }, "search_result_count": { "description": "搜索结果显示数量", "type": "int", @@ -42,4 +59,4 @@ "minimum": 10, "maximum": 200 } -} +} \ No newline at end of file diff --git a/main.py b/main.py index f10d19e..469458f 100644 --- a/main.py +++ b/main.py @@ -1,71 +1,36 @@ import asyncio -import ipaddress import os import re import shutil -import tempfile import time import uuid -from urllib.parse import urljoin, urlparse +from urllib.parse import parse_qs, urlparse import aiohttp from astrbot.api import logger from astrbot.api.event import AstrMessageEvent, filter -from astrbot.api.message_components import Record +from astrbot.api.message_components import Json, Record from astrbot.api.star import Context, Star, register +from astrbot.core.config.default import VERSION +from astrbot.core.utils.astrbot_path import get_astrbot_temp_path SOURCE_DISPLAY = { "tencent": "QQ音乐", - "netease": "网易云", - "kugou": "酷狗", - "kuwo": "酷我", + "netease": "网易云音乐", } REQUEST_TIMEOUT = aiohttp.ClientTimeout(total=120) CHUNK_SIZE = 8192 MAX_SESSION_AGE = 3600 -AUDIO_CONTENT_TYPES = { - "audio/mpeg", - "audio/mp3", - "audio/wav", - "audio/x-wav", - "audio/ogg", - "audio/x-m4a", - "audio/mp4", - "audio/x-matroska", - "application/octet-stream", -} TEMP_FILE_PREFIX = "astrbot_meting_plugin_" class MetingPluginError(Exception): - """插件基础异常""" - - pass - - -class DownloadError(MetingPluginError): - """下载错误""" - - pass - - -class UnsafeURLError(MetingPluginError): - """不安全的URL错误""" - - pass - - -class AudioFormatError(MetingPluginError): - """音频格式错误""" - pass class SessionData: - """会话数据封装类""" - def __init__(self, default_source: str): self._source = default_source self._results = [] @@ -96,22 +61,9 @@ def update_timestamp(self): def _detect_audio_format(data: bytes) -> str | None: - """根据文件头检测音频格式 - - Args: - data: 文件开头字节 - - Returns: - str | None: 音频格式标识,未知返回 None - """ if len(data) < 4: return None - - if data.startswith(b"\xff\xfb") or data.startswith(b"\xff\xf3"): - return "mp3" - if data.startswith(b"\xff\xf2"): - return "mp3" - if data.startswith(b"ID3"): + if data.startswith((b"\xff\xfb", b"\xff\xf3", b"\xff\xf2", b"ID3")): return "mp3" if data.startswith(b"RIFF"): return "wav" @@ -119,902 +71,312 @@ def _detect_audio_format(data: bytes) -> str | None: return "ogg" if data.startswith(b"fLaC"): return "flac" - if len(data) >= 8 and data[4:8] == b"ftyp": + if (len(data) >= 8 and data[4:8] == b"ftyp") or data.startswith(b"\x00\x00\x00"): return "mp4" - if data.startswith(b"\x00\x00\x00"): - if len(data) >= 8 and data[4:8] == b"ftyp": - return "mp4" - return None -def _check_audio_magic(data: bytes) -> bool: - """检查文件头是否为有效的音频格式 - - Args: - data: 文件开头字节 - - Returns: - bool: 是否为有效的音频文件头 - """ - return _detect_audio_format(data) is not None - - -def _get_extension_from_format(audio_format: str) -> str: - """根据音频格式获取文件扩展名 - - Args: - audio_format: 音频格式标识 - - Returns: - str: 文件扩展名 - """ - mapping = { - "mp3": ".mp3", - "wav": ".wav", - "ogg": ".ogg", - "flac": ".flac", - "mp4": ".m4a", - } - return mapping.get(audio_format, ".mp3") - - -@register("astrbot_plugin_meting", "chuyegzs", "基于 MetingAPI 的点歌插件", "1.0.0") +@register("astrbot_plugin_meting", "chuyegzs", "基于 MetingAPI 的点歌插件", "1.2.2") class MetingPlugin(Star): - """MetingAPI 点歌插件 - - 支持多音源搜索和播放,自动分段发送长歌曲 - """ - def __init__(self, context: Context, config=None): super().__init__(context) self.config = config self._sessions: dict[str, SessionData] = {} - self._sessions_lock = None self._http_session = None - self._ffmpeg_path = self._find_ffmpeg() - self._cleanup_task = None - self._download_semaphore = None + self._ffmpeg_path = shutil.which("ffmpeg") or "" self._initialized = False - self._init_lock = None - self._session_audio_locks = {} - self._audio_locks_lock = None + self._sessions_lock = asyncio.Lock() + self._download_semaphore = asyncio.Semaphore(3) async def _ensure_initialized(self): - """确保插件已初始化(惰性初始化)""" if self._initialized: return + self._http_session = aiohttp.ClientSession( + timeout=REQUEST_TIMEOUT, + headers={ + "Referer": "https://astrbot.app/", + "User-Agent": f"AstrBot/{VERSION}", + "UAK": "AstrBot/plugin_meting", + }, + ) + self._initialized = True - if self._init_lock is None: - self._init_lock = asyncio.Lock() - - async with self._init_lock: - if self._initialized: - return - - logger.info("MetingAPI 点歌插件正在初始化...") - - self._sessions_lock = asyncio.Lock() - self._audio_locks_lock = asyncio.Lock() - self._download_semaphore = asyncio.Semaphore(3) - - self._http_session = aiohttp.ClientSession(timeout=REQUEST_TIMEOUT) - self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) - self._initialized = True - logger.info("MetingAPI 点歌插件初始化完成") - - async def initialize(self): - """插件初始化(框架调用)""" - await self._ensure_initialized() - - def _get_config(self, key: str, default=None, validator=None): - """获取配置值,支持类型和范围校验 + if self.use_music_card(): + try: + from astrbot.core.pipeline.respond import stage - Args: - key: 配置键 - default: 默认值 - validator: 校验函数,接受配置值,返回校验是否通过 + with open(stage.__file__, "r", encoding="utf-8") as f: + content = f.read() + if "Comp.Json" not in content: + logger.warning( + "检测到当前 AstrBot 版本可能不支持 JSON 消息组件。请更新 AstrBot 版本,否则音乐卡片可能无法发送。" + ) + except Exception as e: + logger.debug(f"检查 AstrBot兼容性失败: {e}") - Returns: - 配置值或默认值 - """ + def _get_config(self, key: str, default=None): if not self.config: return default - - value = self.config.get(key, default) - if validator is not None and not validator(value): - return default - - return value + return self.config.get(key, default) def get_api_url(self) -> str: - """获取 API 地址 - - Returns: - str: API 地址,如果未配置则返回空字符串 - """ - return self._get_config("api_url", "", lambda x: isinstance(x, str) and x) - - def get_default_source(self) -> str: - """获取默认音源 + return str(self._get_config("api_url", "")).rstrip("/") - Returns: - str: 默认音源,默认为 netease - """ - return self._get_config( - "default_source", "netease", lambda x: x in SOURCE_DISPLAY - ) - - def get_search_result_count(self) -> int: - """获取搜索结果显示数量 - - Returns: - int: 搜索结果显示数量,范围 5-30,默认 10 - """ - return self._get_config( - "search_result_count", 10, lambda x: isinstance(x, int) and 5 <= x <= 30 - ) - - def get_segment_duration(self) -> int: - """获取分段时长 - - Returns: - int: 分段时长(秒),默认 120 - """ - return self._get_config( - "segment_duration", 120, lambda x: isinstance(x, int) and 30 <= x <= 300 - ) - - def get_send_interval(self) -> float: - """获取发送间隔 - - Returns: - float: 发送间隔(秒),默认 1.0 - """ - return self._get_config( - "send_interval", 1.0, lambda x: isinstance(x, (int, float)) and 0 <= x <= 10 - ) + def get_api_type(self) -> int: + return int(self._get_config("api_type", 1) or 1) - def get_max_file_size(self) -> int: - """获取最大文件大小 + def get_sign_api_url(self) -> str: + return str( + self._get_config("api_sign_url", "https://oiapi.net/api/QQMusicJSONArk/") + ).rstrip("/") - Returns: - int: 最大文件大小(字节),默认 50MB - """ - mb = self._get_config( - "max_file_size", 50, lambda x: isinstance(x, int) and 10 <= x <= 200 - ) - return mb * 1024 * 1024 + def use_music_card(self) -> bool: + return bool(self._get_config("use_music_card", False)) async def _get_session(self, session_id: str) -> SessionData: - """获取会话状态(线程安全) - - Args: - session_id: 会话 ID - - Returns: - SessionData: 会话状态对象 - """ + await self._ensure_initialized() async with self._sessions_lock: if session_id not in self._sessions: - self._sessions[session_id] = SessionData(self.get_default_source()) + default_source = str(self._get_config("default_source", "netease")) + self._sessions[session_id] = SessionData(default_source) return self._sessions[session_id] - async def _update_session_timestamp(self, session_id: str): - """更新会话时间戳(线程安全) - - Args: - session_id: 会话 ID - """ - async with self._sessions_lock: - if session_id in self._sessions: - self._sessions[session_id].update_timestamp() - await self._cleanup_old_sessions_locked() - - async def _get_session_audio_lock(self, session_id: str) -> asyncio.Lock: - """获取会话级别的音频处理锁 - - Args: - session_id: 会话 ID - - Returns: - asyncio.Lock: 音频处理锁 - """ - async with self._audio_locks_lock: - if session_id not in self._session_audio_locks: - self._session_audio_locks[session_id] = asyncio.Lock() - return self._session_audio_locks[session_id] - - def _find_ffmpeg(self) -> str: - """查找 FFmpeg 路径 - - Returns: - str: FFmpeg 可执行文件路径,未找到返回空字符串 - """ - ffmpeg_exe = shutil.which("ffmpeg") - if ffmpeg_exe: - logger.info(f"找到 FFmpeg: {ffmpeg_exe}") - return ffmpeg_exe - logger.warning("未找到 FFmpeg,请确保已安装 FFmpeg") - return "" - - def _is_private_ip(self, ip_str: str) -> bool: - """判断 IP 是否为私网地址 - - Args: - ip_str: IP 地址字符串 - - Returns: - bool: 是否为私网地址 - """ - try: - ip = ipaddress.ip_address(ip_str) - return ip.is_private or ip.is_loopback or ip.is_link_local - except ValueError: - return False - - async def _resolve_hostname_async(self, hostname: str) -> list: - """异步解析主机名为 IP 地址列表 - - Args: - hostname: 主机名 - - Returns: - list: IP 地址列表 - """ - try: - loop = asyncio.get_running_loop() - addrinfo = await loop.getaddrinfo(hostname, None) - return [addr[4][0] for addr in addrinfo] - except Exception: - return [] - - async def _validate_url(self, url: str) -> tuple[bool, str]: - """验证 URL 是否安全,防止 SSRF 攻击 - - Args: - url: 要验证的 URL - - Returns: - tuple[bool, str]: (是否安全, 失败原因) - """ - try: - parsed = urlparse(url) - if parsed.scheme not in ("http", "https"): - return False, f"不支持的协议: {parsed.scheme}" - - hostname = parsed.hostname or "" - if not hostname: - return False, "URL 缺少主机名" - - if hostname in ("localhost", "0.0.0.0"): - return False, f"禁止访问本地地址: {hostname}" - - ip_match = re.match(r"^(\d+\.){3}\d+$", hostname) - if ip_match: - if self._is_private_ip(hostname): - return False, f"禁止访问私网地址: {hostname}" - else: - ips = await self._resolve_hostname_async(hostname) - if not ips: - logger.warning(f"[URL 验证] 无法解析主机名: {hostname}") - return False, f"无法解析主机名: {hostname}" - for ip in ips: - if self._is_private_ip(ip): - return False, f"主机名解析到私网地址: {hostname} -> {ip}" - - return True, "" - except Exception as e: - logger.error(f"URL 验证失败: {e}") - return False, f"URL 验证异常: {e}" - - async def _validate_api_url(self, url: str) -> tuple[bool, str]: - """验证 API URL 是否安全 - - Args: - url: API URL - - Returns: - tuple[bool, str]: (是否安全, 失败原因) - """ - is_valid, reason = await self._validate_url(url) - if not is_valid: - logger.warning(f"[API URL 验证] URL 验证失败: {url}, 原因: {reason}") - return False, reason - - parsed = urlparse(url) - hostname = parsed.hostname or "" - if hostname in ("localhost", "127.0.0.1", "0.0.0.0"): - logger.warning(f"[API URL 验证] 禁止使用本地地址: {hostname}") - return False, "API 地址不允许使用本地地址" - - logger.debug(f"[API URL 验证] URL 验证通过: {url}") - return True, "" - - async def _cleanup_old_sessions_locked(self): - """清理过期的会话状态(必须在持锁状态下调用)""" - current_time = time.time() - expired_sessions = [ - sid - for sid, session in self._sessions.items() - if current_time - session.timestamp > MAX_SESSION_AGE - ] - for sid in expired_sessions: - self._sessions.pop(sid, None) - self._session_audio_locks.pop(sid, None) - if expired_sessions: - logger.debug(f"清理了 {len(expired_sessions)} 个过期会话") - - async def _periodic_cleanup(self): - """定期清理过期的会话状态和临时文件""" - while True: - try: - await asyncio.sleep(3600) - async with self._sessions_lock: - await self._cleanup_old_sessions_locked() - self._cleanup_temp_files() - logger.debug("定期清理完成") - except asyncio.CancelledError: - break - except Exception as e: - logger.error(f"定期清理时发生错误: {e}") - - def _cleanup_temp_files(self): - """清理本插件产生的临时文件""" - try: - temp_dir = tempfile.gettempdir() - count = 0 - for filename in os.listdir(temp_dir): - if filename.startswith(TEMP_FILE_PREFIX): - filepath = os.path.join(temp_dir, filename) - try: - if os.path.isfile(filepath): - file_age = time.time() - os.path.getmtime(filepath) - if file_age > 300: - os.remove(filepath) - count += 1 - except Exception: - pass - if count > 0: - logger.debug(f"清理了 {count} 个临时文件") - except Exception as e: - logger.error(f"清理临时文件时发生错误: {e}") - - async def _get_session_source(self, session_id: str) -> str: - """获取会话音源 - - Args: - session_id: 会话 ID - - Returns: - str: 会话音源,如果未设置则返回默认音源 - """ - session = await self._get_session(session_id) - return session.source - - async def _set_session_source(self, session_id: str, source: str): - """设置会话音源 - - Args: - session_id: 会话 ID - source: 音源 - """ - session = await self._get_session(session_id) - session.source = source - await self._update_session_timestamp(session_id) - - async def _set_session_results(self, session_id: str, results: list): - """设置会话搜索结果 - - Args: - session_id: 会话 ID - results: 搜索结果列表 - """ - session = await self._get_session(session_id) - session.results = results - await self._update_session_timestamp(session_id) - - async def _get_session_results(self, session_id: str) -> list: - """获取会话搜索结果 - - Args: - session_id: 会话 ID - - Returns: - list: 搜索结果列表 - """ - session = await self._get_session(session_id) - return session.results - - @filter.command("切换QQ音乐") + @filter.command("切换QQ音乐", alias={"切换腾讯音乐", "切换腾讯点歌", "切换TencentMusic", "切换QQMusic"}) async def switch_tencent(self, event: AstrMessageEvent): - """切换当前会话的音源为QQ音乐""" - await self._ensure_initialized() - session_id = event.unified_msg_origin - await self._set_session_source(session_id, "tencent") + (await self._get_session(event.unified_msg_origin)).source = "tencent" yield event.plain_result("已切换音源为QQ音乐") - @filter.command("切换网易云") + @filter.command("切换网易云", alias={"切换网易云音乐", "切换网易点歌", "切换网抑云", "切换网抑云音乐", "切换NeteaseMusic", "切换Netease"}) async def switch_netease(self, event: AstrMessageEvent): - """切换当前会话的音源为网易云""" - await self._ensure_initialized() - session_id = event.unified_msg_origin - await self._set_session_source(session_id, "netease") - yield event.plain_result("已切换音源为网易云") - - @filter.command("切换酷狗") - async def switch_kugou(self, event: AstrMessageEvent): - """切换当前会话的音源为酷狗""" - await self._ensure_initialized() - session_id = event.unified_msg_origin - await self._set_session_source(session_id, "kugou") - yield event.plain_result("已切换音源为酷狗") - - @filter.command("切换酷我") - async def switch_kuwo(self, event: AstrMessageEvent): - """切换当前会话的音源为酷我""" - await self._ensure_initialized() - session_id = event.unified_msg_origin - await self._set_session_source(session_id, "kuwo") - yield event.plain_result("已切换音源为酷我") + (await self._get_session(event.unified_msg_origin)).source = "netease" + yield event.plain_result("已切换音源为网易云音乐") @filter.regex(r"^点歌(\d+)$") async def play_song_by_index(self, event: AstrMessageEvent): - """播放指定序号的歌曲(点歌x格式,不带空格) - - Args: - event: 消息事件 - """ await self._ensure_initialized() - - message_str = event.get_message_str().strip() session_id = event.unified_msg_origin - - match = re.match(r"^点歌(\d+)$", message_str) + match = re.match(r"^点歌(\d+)$", event.get_message_str().strip()) if not match: return - index = int(match.group(1)) - logger.info(f"[点歌] 播放模式,序号: {index}") - - results = await self._get_session_results(session_id) - logger.info(f"[点歌] 会话结果数量: {len(results)}") - - if not results: - yield event.plain_result('请先使用"点歌 歌曲名"搜索歌曲') + session = await self._get_session(session_id) + if not session.results: + yield event.plain_result("请先搜索歌曲") return - - if index < 1 or index > len(results): - yield event.plain_result( - f"序号超出范围,请输入 1-{len(results)} 之间的序号" - ) + if index < 1 or index > len(session.results): + yield event.plain_result("序号超出范围") return - - song = results[index - 1] - song_url = song.get("url") - + song = session.results[index - 1] + song_url = song.get("url", "") if not song_url: - yield event.plain_result("获取歌曲播放地址失败") + yield event.plain_result("获取歌曲地址失败") return - is_valid, reason = await self._validate_url(song_url) - if not is_valid: - logger.error(f"检测到不安全的 URL: {song_url}, 原因: {reason}") - yield event.plain_result(f"歌曲地址无效: {reason}") - return + if self.use_music_card(): + title = song.get("name") or song.get("title", "未知") + artist = song.get("artist") or song.get("author", "未知歌手") + source = song.get("source") or session.source + cover = song.get("pic", "") + if cover: + if source == "netease": + connector = "&" if "?" in cover else "?" + cover = f"{cover}{connector}picsize=320" + try: + if self._http_session: + async with self._http_session.get( + cover, allow_redirects=False + ) as c_resp: + if c_resp.status in (301, 302): + cover = c_resp.headers.get("Location", cover) + except Exception as e: + logger.warning(f"解析封面跳转失败: {e}") + song_id = "" + try: + query = urlparse(song_url).query + song_id = parse_qs(query).get("id", [""])[0] + except Exception: + pass - try: - temp_file = await self._download_song(song_url, event.get_sender_id()) - if not temp_file: + if source == "netease": + jump_url = f"https://music.163.com/#/song?id={song_id}" + fmt = "163" + elif source == "tencent": + jump_url = f"https://y.qq.com/n/ryqq/songDetail/{song_id}" + fmt = "qq" + else: + jump_url = song_url.replace("type=url", "type=song") + fmt = "163" + + if not self._http_session: + yield event.plain_result("HTTP Session 未初始化") return - yield event.plain_result("正在分段录制歌曲...") - async for result in self._split_and_send_audio( - event, temp_file, session_id - ): - yield result - - except asyncio.CancelledError: - logger.info("播放任务被取消") - yield event.plain_result("播放已取消") - except DownloadError as e: - logger.error(f"下载歌曲失败: {e}") - yield event.plain_result(f"下载失败: {e}") - except UnsafeURLError as e: - logger.error(f"URL 安全检查失败: {e}") - yield event.plain_result(f"安全检查失败: {e}") - except AudioFormatError as e: - logger.error(f"音频格式错误: {e}") - yield event.plain_result(f"格式不支持: {e}") + sign_api = self.get_sign_api_url() + params = { + "url": song_url, + "song": title, + "singer": artist, + "cover": cover, + "jump": jump_url, + "format": fmt, + } + try: + async with self._http_session.get(sign_api, params=params) as resp: + if resp.status != 200: + yield event.plain_result(f"签名接口请求失败: {resp.status}") + return + res_json = await resp.json() + if res_json.get("code") == 1: + ark_data = res_json.get("data") + token = ark_data.get("config", {}).get("token", "") + json_card = Json(data=ark_data, config={"token": token}) + logger.info("音乐卡片签名成功,发送卡片") + logger.debug(f"卡片数据: {json_card}") + yield event.chain_result([json_card]) + else: + yield event.plain_result( + f"签名失败: {res_json.get('message', '未知错误')}" + ) + except Exception as e: + logger.error(f"音乐卡片请求异常: {e}") + yield event.plain_result("制作卡片时出错") + return + try: + temp_file = await self._download_song(song_url) + if temp_file: + yield event.plain_result("正在分段发送语音...") + async for result in self._split_and_send_audio(event, temp_file): + yield result except Exception as e: - logger.error(f"播放歌曲时发生错误: {e}", exc_info=True) - yield event.plain_result("播放失败,请稍后重试") + yield event.plain_result(f"播放失败: {e}") @filter.command("点歌") async def search_song(self, event: AstrMessageEvent): - """搜索歌曲(点歌 xxx格式,带空格) - - Args: - event: 消息事件 - """ - await self._ensure_initialized() - - message_str = event.get_message_str().strip() - session_id = event.unified_msg_origin - - if message_str.startswith("点歌"): - keyword = message_str[2:].strip() - else: - keyword = message_str - - if not keyword: - yield event.plain_result("请输入要搜索的歌曲名称,例如:点歌 一期一会") + msg = event.get_message_str().strip() + kw = msg[2:].strip() if msg.startswith("点歌") else msg + if not kw: return - logger.info(f"[点歌] 搜索模式,关键词: {keyword}") + session = await self._get_session(event.unified_msg_origin) + async for result in self._search_song_with_source(event, kw, session.source): + yield result - api_url = self.get_api_url() - if not api_url: - yield event.plain_result("请先在插件配置中设置 MetingAPI 地址") + @filter.command("腾讯点歌", alias={"QQ点歌", "QQ音乐点歌", "腾讯音乐点歌"}) + async def search_tencent_song(self, event: AstrMessageEvent): + msg = event.get_message_str().strip() + kw = msg[4:].strip() if msg.startswith("腾讯点歌") else msg + if not kw: return - - is_valid, reason = await self._validate_api_url(api_url) - if not is_valid: - logger.error(f"API URL 验证失败: {reason}") - yield event.plain_result(f"API 地址配置无效: {reason}") + async for result in self._search_song_with_source(event, kw, "tencent"): + yield result + + @filter.command("网易点歌", alias={"网易云点歌", "网抑云点歌", "网易云音乐点歌"}) + async def search_netease_song(self, event: AstrMessageEvent): + msg = event.get_message_str().strip() + kw = msg[4:].strip() if msg.startswith("网易点歌") else msg + if not kw: return + async for result in self._search_song_with_source(event, kw, "netease"): + yield result - source = await self._get_session_source(session_id) - logger.info(f"[点歌] API URL: {api_url}, 音源: {source}, 关键词: {keyword}") + async def _search_song_with_source( + self, event: AstrMessageEvent, kw: str, source: str + ): + await self._ensure_initialized() + api_url = self.get_api_url() + api_type = self.get_api_type() + session = await self._get_session(event.unified_msg_origin) try: - params = {"server": source, "type": "search", "id": keyword} - api_endpoint = f"{api_url}/api" - logger.debug(f"[点歌] 请求 API: {api_endpoint}, 参数: {params}") - - async with self._http_session.get(api_endpoint, params=params) as resp: - logger.debug(f"[点歌] API 响应状态码: {resp.status}") - if resp.status != 200: - response_text = await resp.text() - logger.error( - f"搜索失败,API 返回状态码: {resp.status}, 响应: {response_text[:500]}" - ) - yield event.plain_result(f"搜索失败,API 返回状态码: {resp.status}") - return + params = ( + { + "server": source, + "type": "search", + "id": "0", + "dwrc": "false", + "keyword": kw, + } + if api_type == 2 + else {"server": source, "type": "search", "id": kw} + ) + api_endpoint = api_url if api_type == 2 else f"{api_url}/api" - try: - data = await resp.json() - logger.debug( - f"[点歌] API 返回数据类型: {type(data)}, 数据量: {len(data) if isinstance(data, list) else 'N/A'}" - ) - except Exception as e: - response_text = await resp.text() - logger.error( - f"解析 JSON 响应失败: {e}, 响应内容: {response_text[:500]}" - ) - yield event.plain_result( - f"API 响应解析失败,请检查 API 地址是否正确" - ) - return - - if not isinstance(data, list): - logger.error(f"API 返回异常数据类型: {type(data)}, 内容: {data}") - yield event.plain_result("API 返回异常,请稍后重试") + if not self._http_session: + yield event.plain_result("HTTP Session 未初始化") return - if not data or len(data) == 0: - yield event.plain_result(f"未找到歌曲: {keyword}") + async with self._http_session.get(api_endpoint, params=params) as resp: + data = await resp.json() + if not isinstance(data, list) or not data: + yield event.plain_result(f"未找到歌曲: {kw}") return - - result_count = self.get_search_result_count() - results = data[:result_count] - await self._set_session_results(session_id, results) - - message = f"搜索结果(音源: {SOURCE_DISPLAY.get(source, source)}):\n" - for idx, song in enumerate(results, 1): - name = song.get("title", "未知") - artist = song.get("author", "未知歌手") - message += f"{idx}. {name} - {artist}\n" - - message += '\n发送"点歌1"播放第一首歌曲' - yield event.plain_result(message) - - except aiohttp.ClientError as e: - logger.error(f"搜索歌曲时网络错误: {e}", exc_info=True) - yield event.plain_result(f"网络错误: {e}") + result_count = self._get_config("search_result_count", 10) or 10 + session.results = data[: int(result_count)] + res_msg = f"搜索结果 ({SOURCE_DISPLAY.get(source, source)}):\n" + for i, s in enumerate(session.results, 1): + res_msg += f"{i}. {s.get('name') or s.get('title')} - {s.get('artist') or s.get('author')}\n" + res_msg += "\n输入 '点歌序号' 播放" + yield event.plain_result(res_msg) except Exception as e: - logger.error(f"搜索歌曲时发生错误: {e}", exc_info=True) yield event.plain_result(f"搜索失败: {e}") - async def _download_song(self, url: str, sender_id: str) -> str | None: - """下载歌曲文件 + async def _download_song(self, url: str) -> str | None: + temp_path = get_astrbot_temp_path() + if not os.path.exists(temp_path): + os.makedirs(temp_path) - Args: - url: 歌曲 URL - sender_id: 发送者 ID + temp_file = os.path.join(temp_path, f"{TEMP_FILE_PREFIX}{uuid.uuid4()}.tmp") - Returns: - str | None: 临时文件路径,失败返回 None - """ - if not self._http_session: - raise DownloadError("HTTP session 未初始化") - - temp_dir = tempfile.gettempdir() - safe_sender_id = "".join(c for c in str(sender_id) if c.isalnum() or c in "._-") - - download_success = False - max_retries = 3 - retry_count = 0 - temp_file = None - detected_format = None - - while retry_count < max_retries: - try: - async with self._download_semaphore: - logger.debug( - f"开始下载歌曲 (尝试 {retry_count + 1}/{max_retries}): {url}" - ) - - current_url = url - redirect_count = 0 - max_redirects = 5 - - while redirect_count < max_redirects: - is_valid, reason = await self._validate_url(current_url) - if not is_valid: - raise UnsafeURLError(f"URL 验证失败: {reason}") - - async with self._http_session.get( - current_url, allow_redirects=False - ) as resp: - if resp.status in (301, 302, 307, 308): - redirect_url = resp.headers.get("Location", "") - if not redirect_url: - raise DownloadError("重定向响应缺少 Location 头") - - current_url = urljoin(current_url, redirect_url) - logger.debug(f"跟随重定向: {current_url}") - redirect_count += 1 - continue - - if resp.status != 200: - raise DownloadError(f"下载失败,状态码: {resp.status}") - - content_type = resp.headers.get("Content-Type", "") - if not self._is_audio_content(content_type): - raise AudioFormatError( - f"不支持的 Content-Type: {content_type}" - ) - - max_file_size = self.get_max_file_size() - total_size = 0 - first_chunk = None - temp_file = os.path.join( - temp_dir, - f"{TEMP_FILE_PREFIX}{safe_sender_id}_{uuid.uuid4()}.tmp", - ) - - with open(temp_file, "wb") as f: - try: - async for chunk in resp.content.iter_chunked( - CHUNK_SIZE - ): - if first_chunk is None and chunk: - first_chunk = chunk - detected_format = _detect_audio_format( - first_chunk - ) - if not detected_format: - raise AudioFormatError( - "文件头检测失败,不是有效的音频文件" - ) - - f.write(chunk) - total_size += len(chunk) - if total_size > max_file_size: - raise DownloadError( - f"文件过大,已超过 {max_file_size} 字节" - ) - except aiohttp.ClientPayloadError as e: - raise DownloadError(f"连接中断: {e}") from e - - file_size = os.path.getsize(temp_file) - if file_size == 0: - raise DownloadError("下载的文件为空") - - file_ext = _get_extension_from_format(detected_format) - final_file = temp_file + file_ext - os.rename(temp_file, final_file) - temp_file = final_file - - logger.info( - f"歌曲下载成功,文件大小: {file_size} 字节,格式: {detected_format}" - ) - download_success = True - return temp_file - - raise DownloadError(f"重定向次数超过限制: {max_redirects}") - - except (aiohttp.ClientError, aiohttp.ClientPayloadError) as e: - retry_count += 1 - logger.error( - f"下载歌曲时网络错误 (尝试 {retry_count}/{max_retries}): {e}" + async with self._download_semaphore: + if not self._http_session: + return None + async with self._http_session.get(url, allow_redirects=True) as resp: + if resp.status != 200: + return None + content = await resp.read() + + detected_format = _detect_audio_format(content[:1024]) + ext_map = { + "mp3": ".mp3", + "wav": ".wav", + "ogg": ".ogg", + "flac": ".flac", + "mp4": ".m4a", + } + ext = ( + ext_map.get(str(detected_format), ".mp3") + if detected_format + else ".mp3" ) - if retry_count >= max_retries: - raise DownloadError(f"网络错误: {e}") from e - await asyncio.sleep(1) - except (DownloadError, UnsafeURLError, AudioFormatError): - raise - except Exception as e: - logger.error(f"下载歌曲时发生错误: {e}", exc_info=True) - raise DownloadError(f"下载失败: {e}") from e - finally: - if not download_success and temp_file and os.path.exists(temp_file): - try: - os.remove(temp_file) - logger.debug("清理临时文件") - except Exception: - pass - return None + with open(temp_file, "wb") as f: + f.write(content) + os.rename(temp_file, temp_file + ext) + return temp_file + ext - def _is_audio_content(self, content_type: str) -> bool: - """判断 Content-Type 是否为音频 - - Args: - content_type: Content-Type 头 - - Returns: - bool: 是否为音频 - """ - if not content_type: - return False - content_type_lower = content_type.lower().split(";")[0].strip() - return content_type_lower in AUDIO_CONTENT_TYPES - - def _iterate_audio_segments(self, audio, segment_ms: int): - """迭代音频片段(生成器方式,降低内存占用) - - Args: - audio: AudioSegment 对象 - segment_ms: 每段的毫秒数 - - Yields: - tuple: (片段索引, 音频片段) - """ - total_duration = len(audio) - idx = 1 - for start in range(0, total_duration, segment_ms): - end = min(start + segment_ms, total_duration) - segment = audio[start:end] - yield idx, segment - idx += 1 - - def _export_segment(self, segment, segment_file: str) -> bool: - """导出音频片段到文件 - - Args: - segment: AudioSegment 片段 - segment_file: 目标文件路径 - - Returns: - bool: 是否成功 - """ - try: - segment.export(segment_file, format="wav") - return True - except Exception as e: - logger.error(f"导出音频片段失败: {e}") - return False - - async def _split_and_send_audio( - self, event: AstrMessageEvent, temp_file: str, session_id: str - ): - """分割音频并发送 - - Args: - event: 消息事件 - temp_file: 音频文件路径 - session_id: 会话 ID,用于获取会话级别的锁 - """ - temp_files_to_cleanup = [temp_file] - - try: - if not self._ffmpeg_path: - logger.error("FFmpeg 路径为空") - yield event.plain_result("未找到 FFmpeg,请确保已安装 FFmpeg") - return - - try: - from pydub import AudioSegment - - AudioSegment.converter = self._ffmpeg_path - except ImportError as e: - logger.error(f"导入 pydub 失败: {e}") - yield event.plain_result("缺少音频处理依赖,请联系管理员") - return - - audio_lock = await self._get_session_audio_lock(session_id) - async with audio_lock: - try: - logger.debug(f"开始处理音频文件: {temp_file}") + async def _split_and_send_audio(self, event, temp_file): + if not self._ffmpeg_path: + logger.error("未找到 ffmpeg,无法处理音频。请安装 ffmpeg。") + yield event.plain_result("未找到 ffmpeg,无法播放音频") + return - try: - audio = AudioSegment.from_file(temp_file) - except Exception as e: - logger.error(f"音频文件解码失败: {e}") - yield event.plain_result("音频文件格式不支持或已损坏") - return + from pydub import AudioSegment - total_duration = len(audio) - segment_ms = self.get_segment_duration() * 1000 - send_interval = self.get_send_interval() - logger.debug( - f"音频总时长: {total_duration}ms, 分段时长: {segment_ms}ms" - ) + AudioSegment.converter = self._ffmpeg_path + audio = AudioSegment.from_file(temp_file) - base_name = os.path.splitext(os.path.basename(temp_file))[0] - success_count = 0 + duration = self._get_config("segment_duration", 120) or 120 + seg_ms = int(duration) * 1000 - for idx, segment in self._iterate_audio_segments(audio, segment_ms): - segment_file = os.path.join( - tempfile.gettempdir(), - f"{base_name}_segment_{idx}_{uuid.uuid4()}.wav", - ) - temp_files_to_cleanup.append(segment_file) - - if not self._export_segment(segment, segment_file): - continue - - try: - record = Record.fromFileSystem(segment_file) - yield event.chain_result([record]) - await asyncio.sleep(send_interval) - success_count += 1 - except Exception as e: - logger.error(f"发送语音片段 {idx} 时发生错误: {e}") - yield event.plain_result(f"发送语音片段 {idx} 失败") - - try: - if os.path.exists(segment_file): - os.remove(segment_file) - temp_files_to_cleanup.remove(segment_file) - except Exception: - pass - - if success_count > 0: - yield event.plain_result("歌曲播放完成") - - except asyncio.CancelledError: - logger.info("音频处理任务被取消") - yield event.plain_result("音频处理已取消") - except Exception as e: - logger.error(f"分割音频时发生错误: {e}", exc_info=True) - yield event.plain_result("音频处理失败,请稍后重试") - finally: - for f in temp_files_to_cleanup: - try: - if os.path.exists(f): - os.remove(f) - logger.debug(f"清理临时文件: {f}") - except Exception: - pass - - async def terminate(self): - """插件终止时清理资源""" - if self._cleanup_task: - self._cleanup_task.cancel() - try: - await self._cleanup_task - except asyncio.CancelledError: - pass - - if self._http_session: - await self._http_session.close() - self._http_session = None + send_interval = self._get_config("send_interval", 1.0) or 1.0 + interval = float(send_interval) - self._sessions.clear() - self._session_audio_locks.clear() - - self._initialized = False - self._cleanup_temp_files() + for i, start in enumerate(range(0, len(audio), seg_ms), 1): + path = f"{temp_file}_{i}.wav" + audio[start : start + seg_ms].export(path, format="wav") + yield event.chain_result([Record(path)]) + if os.path.exists(path): + os.remove(path) + await asyncio.sleep(interval) + if os.path.exists(temp_file): + os.remove(temp_file)