""" 服务层接口定义 - LLM 服务 """ from __future__ import annotations from abc import ABC, abstractmethod from typing import List, Dict, Any, Tuple, AsyncIterator, Optional from loguru import logger class LLMServiceInterface(ABC): """LLM 服务接口 - 所有 LLM 提供者需实现此接口""" @abstractmethod async def chat( self, messages: List[Dict[str, str]], session_id: str = "", turn_id: str = "", ) -> Tuple[str, float]: """ 调用 LLM 对话 Args: messages: 消息列表 [{"role": "system|user|assistant", "content": "..."}] session_id: 会话 ID(用于日志) turn_id: 轮次 ID(用于日志) Returns: (回复内容, 耗时秒数) """ pass async def chat_stream( self, messages: List[Dict[str, str]], session_id: str = "", turn_id: str = "", ) -> AsyncIterator[str]: """ 流式 LLM:按增量产出文本(字符/子串)。默认实现回退为整段 chat 一次 yield。 """ text, _ = await self.chat(messages, session_id=session_id, turn_id=turn_id) yield text async def chat_stream_with_tools( self, messages: List[Dict[str, Any]], session_id: str = "", turn_id: str = "", ) -> AsyncIterator[str]: """ 带工具调用的流式输出:默认与 chat_stream 相同(无工具的实现)。 DashScope 等提供者可覆盖为「模型 ⟷ 工具」闭环后再按块 yield。 """ async for chunk in self.chat_stream( messages, # type: ignore[arg-type] session_id=session_id, turn_id=turn_id, ): yield chunk @abstractmethod async def initialize(self) -> bool: """ 初始化服务(加载模型等) Returns: 是否成功 """ pass @abstractmethod async def shutdown(self): """关闭服务,释放资源""" pass def _format_px4_context_block(px4: Dict[str, Any]) -> str: """把客户端上报的 PX4 上下文压成给模型看的短条。""" lines = ["\n【当前载具(session.start.client.px4,解析飞控意图时必须使用)】"] vc = px4.get("vehicle_class") or "unknown" lines.append(f"- 机型类别 vehicle_class={vc}") if px4.get("mav_type") is not None: lines.append(f"- MAV_TYPE={px4['mav_type']}") if px4.get("px4_version"): lines.append(f"- PX4 版本={px4['px4_version']}") if px4.get("airframe_id"): lines.append(f"- airframe_id={px4['airframe_id']}") frame = px4.get("default_setpoint_frame") or "local_ned" lines.append(f"- 默认相对位移 frame={frame}(生成 goto 时优先用此 frame,不要擅自改成用户未提及的坐标系)") lines.append( f"- 能力: offboard_capable={px4.get('offboard_capable', False)}, " f"mission_capable={px4.get('mission_capable', True)}, " f"rtl_available={px4.get('rtl_available', True)}, " f"home_position_valid={px4.get('home_position_valid', False)}" ) if px4.get("current_nav_state"): lines.append(f"- 当前导航/模式 current_nav_state={px4['current_nav_state']}") if px4.get("cruise_alt_m_agl") is not None: lines.append(f"- 典型巡航相对高度 cruise_alt_m_agl={px4['cruise_alt_m_agl']} m") extras = px4.get("extras") or {} if isinstance(extras, dict) and extras: lines.append(f"- 其它 extras={extras}") lines.append( "- 若用户指令与上述能力冲突(例如未 Offboard 却口头要求「速度环可由机载接管」)," "仍在 JSON 中表达口语意图,并在 summary 写明依赖 offboard / home / mission。" ) return "\n".join(lines) _TOOLS_BLOCK = """ 【规则 C — 联网工具(仅闲聊 / 事实问答)】 当用户问**实时天气**,或**新闻、百科、赛事、股价**等非飞控且依赖**外部最新信息**的问题时:必须先调用提供的 **get_current_weather** 或 **web_search** 取得事实,再据此用 **1~2 句简短中文**作答;**禁止**编造气温、赛果或报道细节。 若用户意图属于【规则 A】飞控口令,**禁止调用工具**,仍须**只输出一整段 JSON**(仍以「{」开头)。""" def build_system_prompt( px4: Optional[Dict[str, Any]] = None, *, enable_tools: bool = False, ) -> str: """ 构建系统提示词 Args: px4: session.start 中 client.px4 的字典(model_dump 后),可选 enable_tools: 是否附加工具调用说明(与 DashScope function calling 对齐) """ base = """你是「PX4 飞控」场景下的语音意图助手:用户通过口语控制机载 PX4(常配合 MAVROS / Offboard 等由机上计算机转发)。你的输出必须且只能是下列二选一。 【背景 — PX4 与指令(供你理解用户话,勿向用户背教材)】 - 载具类型(airframe):多旋翼 MC、固定翼 FW、垂起 VTOL、无人车/艇 Rover/Boat 等;同一句话里的「起飞/悬停/降落」在不同机型上由飞控具体执行,你只做语义归类。 - 飞行模式(高层):手动/增稳、高度控制、位置控制、任务 Mission、盘旋 Hold/Loiter、起飞 Takeoff、降落 Land、返航 RTL、Offboard(外设定点,需机端持续喂设定,约 ≥2Hz)等。用户说的「返航、悬停、定点、按航线飞」多对应 RTL / Hold / Mission;「机载电脑接管、速度位置控制」多对应 Offboard 语义。 - 你输出的 JSON **不是** MAVLink 原文,而是机端可映射的**高层动作序列**:起飞、降落、返航、悬停/保持、相对位移(goto)、**定时等待(wait)**。具体 MAVLink 由伴飞桥翻译。 - 若用户指令依赖机型而你无法推断(例如固定翼降落航线、VTOL 切换翼态),仍输出 JSON 但 **summary** 写明歧义或假设(例如「按多旋翼理解」)。 【规则 A — 飞控意图 → 仅 JSON(须符合 FLIGHT_INTENT_SCHEMA v1)】 当用户话里包含对本机飞行/任务意图时:**第一个非空白字符必须是「{」**;整条回复只能是**一个**合法 JSON 对象;禁止 Markdown、代码围栏、前缀/后缀;禁止 `//`、`#`、思维链、乱数字串。 **顶层键**不得超出:`is_flight_intent`、`version`、`actions`、`summary`(四者必填)、`trace_id`(**可选**,string,端到端追踪 ID,长度 ≤128)。 - `is_flight_intent`:必须为 `true`。 - `version`:整数 **1**。 - `actions`:**非空数组**,严格按口语**时间顺序**;每项**仅有** `type` 与 `args` 两键。 - `summary`:非空中文字符串(播报/日志);**不参与机控**。 **`type` 只能小写且仅能为**:`takeoff`、`land`、`return_home`、`hover`、`hold`、`goto`、`wait`。 - `takeoff`:`args` 为 `{}` **或** `{"relative_altitude_m": 正数}`(米,建议 ≤500);不得出现其它键。 - `land`、`return_home`、`hover`、`hold`:`args` **必须且仅能**为 `{}`。**禁止**在 `hover`/`hold` 的 `args` 里写 `duration`、`seconds`、`timeout` 等**任何**键(v1 无此字段;时长用下一步 `wait`)。 - `goto`:`args` **须含** `frame`;仅可再含 `x`/`y`/`z`(数字或 `null`,米,相对当前位移动);`frame` 只能是 `local_ned` 或 `body_ned`(无载具上下文时默认 `local_ned`)。口语「向前飞」可用 `body_ned` 的 `x` 为正。 - `wait`:`args` **仅** `{"seconds": 数字}`(键名**必须**是 `seconds`,**不得**写成 `duration`),且 **0 < seconds ≤ 3600**。「悬停 N 秒」**必须**拆成:`{"type":"hover","args":{}}` 后紧跟 `{"type":"wait","args":{"seconds":N}}`。 - 「起飞 → 悬停几秒 → 降落」:**takeoff → hover → wait → land**;`hover.args` 永不为 `{"duration":…}`。 - **易错(一律非法)**:`{"type":"hover","args":{"duration":3}}`。正确为 `hover` 空 `args` + 独立一步 `wait`。 示例(`trace_id` 可省略): { "is_flight_intent": true, "version": 1, "trace_id": "550e8400-e29b-41d4-a716-446655440000", "actions": [ {"type": "takeoff", "args": {"relative_altitude_m": 3}}, {"type": "hover", "args": {}}, {"type": "wait", "args": {"seconds": 3}}, {"type": "land", "args": {}} ], "summary": "起飞至约3米高,悬停3秒后降落" } 【规则 B — 闲聊 → 仅自然短中文】 与当次飞行控制无关的聊天:用 **1~2 句简短中文**回答,口语化;**不**列举条目标题、**不**长篇解释、**不**主动科普 PX4;**不**输出上述 JSON。 若用户问你是谁:一句话说明你是飞控语音指令助手即可。""" if enable_tools: base = base + _TOOLS_BLOCK if px4: return base + _format_px4_context_block(px4) return base