""" 流式 LLM + TTS 烟测(不播放音频、不依赖交互)。 用法(在项目根目录): python test/test_streaming_smoke.py 依赖:服务端已启动,且 .env 中 LLM/TTS 可正常调用。 环境变量 VOICE_TEST_WS_URL 可覆盖 WebSocket URL。 """ from __future__ import annotations import asyncio import json import sys import time from pathlib import Path import websockets sys.path.insert(0, str(Path(__file__).parent)) from test_config import SERVER_URL, AUTH_TOKEN, DEVICE_ID async def one_turn( ws, *, text: str, expect_routing: str, ) -> dict: turn_id = f"smoke-{int(time.time() * 1000)}" deltas: list[str] = [] dialog_order: list[str] = [] pcm = bytearray() saw_dialog = False first_tts_before_dialog: bool | None = None await ws.send( json.dumps( { "type": "turn.text", "proto_version": "1.0", "transport_profile": "text_uplink", "turn_id": turn_id, "text": text, "is_final": True, "source": "device_stt", }, ensure_ascii=False, ) ) while True: msg = await asyncio.wait_for(ws.recv(), timeout=120) if isinstance(msg, bytes): pcm.extend(msg) continue data = json.loads(msg) t = data.get("type") dialog_order.append(t) if t == "llm.text_delta": if data.get("delta"): deltas.append(data["delta"]) continue if t == "dialog_result": saw_dialog = True routing = data.get("routing") if routing != expect_routing: return { "ok": False, "error": f"routing 期望 {expect_routing} 实际 {routing}", "order": dialog_order, } continue if t == "tts_audio_chunk": if first_tts_before_dialog is None: first_tts_before_dialog = not saw_dialog continue if t == "turn.complete": reply_concat = "".join(deltas) chat = None # 重放顺序里没有 dialog_result 的完整对象,仅校验收尾 return { "ok": True, "turn_id": turn_id, "delta_chunks": len(deltas), "reply_preview": reply_concat[:120], "pcm_bytes": len(pcm), "order_tail": dialog_order[-8:], "first_tts_before_dialog": first_tts_before_dialog, "metrics": data.get("metrics"), } if t == "error": return { "ok": False, "error": f"{data.get('code')}: {data.get('message')}", "order": dialog_order, } return {"ok": False, "error": "unreachable"} async def main() -> int: print(f"连接 {SERVER_URL}") try: async with websockets.connect(SERVER_URL) as ws: await ws.send( json.dumps( { "type": "session.start", "proto_version": "1.0", "transport_profile": "text_uplink", "session_id": "smoke-session", "auth_token": AUTH_TOKEN, "client": { "device_id": DEVICE_ID, "locale": "zh-CN", "capabilities": { "playback_sample_rate_hz": 24000, "prefer_tts_codec": "pcm_s16le", }, "protocol": { "dialog_result": "cloud_voice_dialog_v1" }, }, }, ensure_ascii=False, ) ) raw = await asyncio.wait_for(ws.recv(), timeout=30) ready = json.loads(raw) if ready.get("type") != "session.ready": print("FAIL: 非 session.ready", ready) return 1 print("OK session.ready") # 闲聊 r1 = await one_turn(ws, text="你好,用一句话介绍你自己。", expect_routing="chitchat") print("\n[闲聊]", json.dumps(r1, ensure_ascii=False, indent=2)) if not r1.get("ok"): return 1 if r1["delta_chunks"] < 1: print("WARN: 未收到 llm.text_delta 增量(可能模型一次块较大)") if r1["pcm_bytes"] < 1000: print("FAIL: PCM 过短", r1["pcm_bytes"]) return 1 # 飞控 r2 = await one_turn(ws, text="返航", expect_routing="flight_intent") print("\n[飞控]", json.dumps(r2, ensure_ascii=False, indent=2)) if not r2.get("ok"): return 1 if r2["pcm_bytes"] < 500: print("FAIL: 飞控 PCM 过短", r2["pcm_bytes"]) return 1 print("\nPASS: 流式 + TTS 烟测通过") return 0 except Exception as e: print("FAIL:", e) import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(asyncio.run(main()))