223 lines
7.4 KiB
Python
223 lines
7.4 KiB
Python
"""
|
||
机外工具执行:天气(Open-Meteo)、搜索摘要(DuckDuckGo Instant Answer)。
|
||
仅用标准库 HTTP,无额外依赖。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from typing import Any, Mapping
|
||
|
||
from loguru import logger
|
||
|
||
# WMO Weather interpretation codes (Open-Meteo),精简中文
|
||
_WMO_SHORT = {
|
||
0: "晴",
|
||
1: "大部晴",
|
||
2: "局部多云",
|
||
3: "多云",
|
||
45: "雾",
|
||
48: "雾凇",
|
||
51: "小毛毛雨",
|
||
53: "中毛毛雨",
|
||
55: "大毛毛雨",
|
||
56: "冻毛毛雨",
|
||
57: "强冻毛毛雨",
|
||
61: "小雨",
|
||
63: "中雨",
|
||
65: "大雨",
|
||
66: "冻雨",
|
||
67: "强冻雨",
|
||
71: "小雪",
|
||
73: "中雪",
|
||
75: "大雪",
|
||
77: "雪粒",
|
||
80: "小阵雨",
|
||
81: "阵雨",
|
||
82: "强阵雨",
|
||
85: "小阵雪",
|
||
86: "阵雪",
|
||
95: "雷暴",
|
||
96: "雷暴伴冰雹",
|
||
99: "强雷暴伴冰雹",
|
||
}
|
||
|
||
|
||
def _http_get_json(url: str, timeout: float = 12.0) -> Any:
|
||
req = urllib.request.Request(
|
||
url,
|
||
headers={"User-Agent": "VoiceLLMCloud/1.0 (tool_executor)"},
|
||
method="GET",
|
||
)
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8", errors="replace")
|
||
return json.loads(raw)
|
||
|
||
|
||
def _weather_desc(code: int | None) -> str:
|
||
if code is None:
|
||
return "未知"
|
||
return _WMO_SHORT.get(int(code), f"代码{code}")
|
||
|
||
|
||
def get_current_weather(location: str) -> str:
|
||
"""
|
||
Open-Meteo:地理编码 + current 天气,无需 API Key。
|
||
"""
|
||
loc = (location or "").strip()
|
||
if not loc:
|
||
return "错误:地点为空"
|
||
|
||
try:
|
||
q = urllib.parse.quote(loc)
|
||
geo_url = (
|
||
"https://geocoding-api.open-meteo.com/v1/search"
|
||
f"?name={q}&count=1&language=zh&format=json"
|
||
)
|
||
geo = _http_get_json(geo_url)
|
||
results = geo.get("results") if isinstance(geo, dict) else None
|
||
if not results:
|
||
return f"未找到地点「{loc}」,请说具体城市名。"
|
||
g0 = results[0]
|
||
lat, lon = float(g0["latitude"]), float(g0["longitude"])
|
||
label = g0.get("name") or loc
|
||
admin = g0.get("admin1") or g0.get("country") or ""
|
||
place = f"{label}" + (f"({admin})" if admin else "")
|
||
|
||
lat_q = urllib.parse.quote(str(lat))
|
||
lon_q = urllib.parse.quote(str(lon))
|
||
wx_url = (
|
||
"https://api.open-meteo.com/v1/forecast?"
|
||
f"latitude={lat_q}&longitude={lon_q}"
|
||
"¤t=temperature_2m,relative_humidity_2m,apparent_temperature,"
|
||
"weather_code,wind_speed_10m,wind_direction_10m"
|
||
"&timezone=auto&forecast_days=1"
|
||
)
|
||
wx = _http_get_json(wx_url)
|
||
cur = wx.get("current") if isinstance(wx, dict) else None
|
||
if not cur:
|
||
return f"无法读取 {place} 的实时天气数据。"
|
||
|
||
t = cur.get("temperature_2m")
|
||
feels = cur.get("apparent_temperature")
|
||
rh = cur.get("relative_humidity_2m")
|
||
wcode = cur.get("weather_code")
|
||
ws = cur.get("wind_speed_10m")
|
||
wd = cur.get("wind_direction_10m")
|
||
desc = _weather_desc(wcode)
|
||
|
||
parts = [
|
||
f"{place}当前天气:{desc}",
|
||
f"气温约{t}°C" if t is not None else "",
|
||
f"体感约{feels}°C" if feels is not None else "",
|
||
f"相对湿度约{rh}%" if rh is not None else "",
|
||
]
|
||
if ws is not None:
|
||
wind_part = f"风速约{ws}m/s"
|
||
if wd is not None:
|
||
wind_part += f"、风向{wd}°"
|
||
parts.append(wind_part)
|
||
text = ",".join(p for p in parts if p)
|
||
return text or "天气数据不完整"
|
||
except urllib.error.URLError as e:
|
||
logger.warning(f"[tool] weather URLError: {e}")
|
||
return f"天气服务暂时不可用:{e}"
|
||
except Exception as e:
|
||
logger.warning(f"[tool] weather error: {e}")
|
||
return f"查询天气失败:{e}"
|
||
|
||
|
||
def web_search(query: str) -> str:
|
||
"""
|
||
DuckDuckGo HTML API:Instant Answer + 相关主题摘要,无 API Key。
|
||
命中差时返回简短说明,避免模型胡编。
|
||
"""
|
||
q = (query or "").strip()[:200]
|
||
if not q:
|
||
return "错误:搜索词为空"
|
||
|
||
try:
|
||
qs = urllib.parse.quote(q)
|
||
url = (
|
||
"https://api.duckduckgo.com/"
|
||
f"?q={qs}&format=json&no_html=1&no_redirect=1&t=voicellmcloud"
|
||
)
|
||
# DDG 偶发对 urllib 限流;仍优于无搜索
|
||
data = _http_get_json(url, timeout=14.0)
|
||
if not isinstance(data, dict):
|
||
return "搜索无结构化结果,请换个说法或稍后重试。"
|
||
|
||
chunks: list[str] = []
|
||
ab = data.get("AbstractText")
|
||
if isinstance(ab, str) and ab.strip():
|
||
chunks.append(ab.strip())
|
||
ans = data.get("Answer")
|
||
if isinstance(ans, str) and ans.strip():
|
||
chunks.append(ans.strip())
|
||
elif ans not in (None, ""):
|
||
chunks.append(str(ans))
|
||
|
||
for rt in (data.get("RelatedTopics") or [])[:4]:
|
||
if isinstance(rt, dict) and isinstance(rt.get("Text"), str):
|
||
t = rt["Text"].strip()
|
||
if t:
|
||
chunks.append(t)
|
||
elif isinstance(rt, dict) and "Topics" in rt:
|
||
for sub in (rt.get("Topics") or [])[:2]:
|
||
if isinstance(sub, dict) and isinstance(sub.get("Text"), str):
|
||
s = sub["Text"].strip()
|
||
if s:
|
||
chunks.append(s)
|
||
|
||
if not chunks:
|
||
heading = data.get("Heading")
|
||
if isinstance(heading, str) and heading.strip():
|
||
return (
|
||
f"未检索到「{q}」的摘要(仅标题:{heading.strip()})。"
|
||
"可改用更短关键词或指定天气类用问天气。"
|
||
)
|
||
return f"未检索到「{q}」的公开摘要,请缩短或更换关键词。"
|
||
|
||
out = "\n".join(dict.fromkeys(chunks)) # 去重保序
|
||
if len(out) > 6000:
|
||
out = out[:6000] + "\n…(截断)"
|
||
return out
|
||
except urllib.error.URLError as e:
|
||
logger.warning(f"[tool] web_search URLError: {e}")
|
||
return f"搜索服务暂时不可用:{e}"
|
||
except Exception as e:
|
||
logger.warning(f"[tool] web_search error: {e}")
|
||
return f"搜索失败:{e}"
|
||
|
||
|
||
def dispatch_tool(name: str, arguments: str | Mapping[str, Any] | None) -> str:
|
||
"""根据函数名与参数 JSON 执行工具,返回给模型的字符串。"""
|
||
if not name:
|
||
return "错误:未指定工具名"
|
||
if isinstance(arguments, str):
|
||
arg_str = arguments.strip() or "{}"
|
||
try:
|
||
parsed: Any = json.loads(arg_str)
|
||
except json.JSONDecodeError as e:
|
||
return f"工具参数不是合法 JSON:{e}"
|
||
elif isinstance(arguments, Mapping):
|
||
parsed = dict(arguments)
|
||
elif arguments is None:
|
||
parsed = {}
|
||
else:
|
||
return "错误:工具参数类型不支持"
|
||
|
||
if not isinstance(parsed, dict):
|
||
return "错误:工具参数须为 JSON 对象"
|
||
|
||
n = name.strip().lower()
|
||
if n == "get_current_weather":
|
||
loc = str(parsed.get("location", "")).strip()
|
||
return get_current_weather(loc)
|
||
if n == "web_search":
|
||
query = str(parsed.get("query", "")).strip()
|
||
return web_search(query)
|
||
return f"未知工具:{name}"
|