161 lines
4.0 KiB
Python
161 lines
4.0 KiB
Python
"""flight_intent v1 校验与 goto→Command 映射。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from voice_drone.core.flight_intent import (
|
|
goto_action_to_command,
|
|
parse_flight_intent_dict,
|
|
)
|
|
|
|
|
|
def _command_stack_available() -> bool:
|
|
try:
|
|
import yaml # noqa: F401
|
|
|
|
from voice_drone.core.command import Command # noqa: F401
|
|
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
needs_command_stack = pytest.mark.skipif(
|
|
not _command_stack_available(),
|
|
reason="需要 pyyaml 与工程配置以加载 Command",
|
|
)
|
|
|
|
|
|
def test_parse_minimal_ok():
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [{"type": "land", "args": {}}],
|
|
"summary": "降落",
|
|
}
|
|
)
|
|
assert err == []
|
|
assert v is not None
|
|
assert v.actions[0].type == "land"
|
|
|
|
|
|
def test_hover_duration_cloud_legacy_expands_to_wait():
|
|
"""云端误将停顿时长写在 hover.args.duration 时,客户端规范化为 hover + wait。"""
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [
|
|
{"type": "takeoff", "args": {}},
|
|
{"type": "hover", "args": {"duration": 3}},
|
|
{"type": "land", "args": {}},
|
|
],
|
|
"summary": "test",
|
|
}
|
|
)
|
|
assert err == []
|
|
assert v is not None
|
|
assert len(v.actions) == 4
|
|
assert v.actions[0].type == "takeoff"
|
|
assert v.actions[1].type == "hover"
|
|
assert v.actions[2].type == "wait"
|
|
assert float(v.actions[2].args.seconds) == 3.0
|
|
assert v.actions[3].type == "land"
|
|
|
|
|
|
def test_wait_after_hover_ok():
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [
|
|
{"type": "takeoff", "args": {}},
|
|
{"type": "hover", "args": {}},
|
|
{"type": "wait", "args": {"seconds": 2.5}},
|
|
{"type": "land", "args": {}},
|
|
],
|
|
"summary": "test",
|
|
}
|
|
)
|
|
assert err == []
|
|
assert v is not None
|
|
assert len(v.actions) == 4
|
|
assert v.actions[2].type == "wait"
|
|
assert v.trace_id is None
|
|
|
|
|
|
def test_first_wait_rejected():
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [
|
|
{"type": "wait", "args": {"seconds": 1}},
|
|
{"type": "land", "args": {}},
|
|
],
|
|
"summary": "x",
|
|
}
|
|
)
|
|
assert v is None
|
|
assert err
|
|
|
|
|
|
def test_extra_top_key_rejected():
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [{"type": "land", "args": {}}],
|
|
"summary": "x",
|
|
"foo": 1,
|
|
}
|
|
)
|
|
assert v is None
|
|
assert err
|
|
|
|
|
|
@needs_command_stack
|
|
def test_goto_body_forward():
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [
|
|
{
|
|
"type": "goto",
|
|
"args": {"frame": "body_ned", "x": 3},
|
|
}
|
|
],
|
|
"summary": "s",
|
|
}
|
|
)
|
|
assert v is not None and not err
|
|
cmd, reason = goto_action_to_command(v.actions[0], sequence_id=7)
|
|
assert reason is None
|
|
assert cmd is not None
|
|
assert cmd.command == "forward"
|
|
assert cmd.sequence_id == 7
|
|
|
|
|
|
@needs_command_stack
|
|
def test_goto_multi_axis_no_command():
|
|
v, err = parse_flight_intent_dict(
|
|
{
|
|
"is_flight_intent": True,
|
|
"version": 1,
|
|
"actions": [
|
|
{
|
|
"type": "goto",
|
|
"args": {"frame": "body_ned", "x": 1, "y": 1},
|
|
}
|
|
],
|
|
"summary": "s",
|
|
}
|
|
)
|
|
assert v is not None and not err
|
|
cmd, reason = goto_action_to_command(v.actions[0], 1)
|
|
assert cmd is None
|
|
assert reason and "multi-axis" in reason
|