129 lines
2.9 KiB
Python
129 lines
2.9 KiB
Python
"""
|
||
音频处理工具
|
||
"""
|
||
import numpy as np
|
||
from typing import Tuple
|
||
|
||
|
||
def resample_audio(
|
||
audio: np.ndarray,
|
||
orig_sr: int,
|
||
target_sr: int,
|
||
) -> np.ndarray:
|
||
"""
|
||
音频重采样(简单线性插值)
|
||
|
||
Args:
|
||
audio: 音频数据
|
||
orig_sr: 原始采样率
|
||
target_sr: 目标采样率
|
||
|
||
Returns:
|
||
重采样后的音频
|
||
"""
|
||
if orig_sr == target_sr:
|
||
return audio
|
||
|
||
duration = len(audio) / orig_sr
|
||
num_samples = int(duration * target_sr)
|
||
|
||
# 线性插值
|
||
old_indices = np.arange(len(audio))
|
||
new_indices = np.linspace(0, len(audio) - 1, num_samples)
|
||
|
||
resampled = np.interp(new_indices, old_indices, audio).astype(audio.dtype)
|
||
|
||
return resampled
|
||
|
||
|
||
def normalize_audio(audio: np.ndarray, target_peak: float = 0.92) -> np.ndarray:
|
||
"""
|
||
音频归一化
|
||
|
||
Args:
|
||
audio: 音频数据
|
||
target_peak: 目标峰值
|
||
|
||
Returns:
|
||
归一化后的音频
|
||
"""
|
||
peak = np.max(np.abs(audio))
|
||
|
||
if peak > 0:
|
||
audio = audio * (target_peak / peak)
|
||
|
||
return audio
|
||
|
||
|
||
def apply_gain(audio: np.ndarray, gain: float = 1.0) -> np.ndarray:
|
||
"""
|
||
应用音量增益
|
||
|
||
Args:
|
||
audio: 音频数据
|
||
gain: 增益倍数
|
||
|
||
Returns:
|
||
增益后的音频
|
||
"""
|
||
return audio * gain
|
||
|
||
|
||
def apply_fade(
|
||
audio: np.ndarray,
|
||
sample_rate: int,
|
||
fade_ms: int = 10,
|
||
) -> np.ndarray:
|
||
"""
|
||
首尾淡入淡出
|
||
|
||
Args:
|
||
audio: 音频数据
|
||
sample_rate: 采样率
|
||
fade_ms: 淡入淡出时长(毫秒)
|
||
|
||
Returns:
|
||
处理后的音频
|
||
"""
|
||
fade_samples = int(sample_rate * fade_ms / 1000)
|
||
fade_samples = min(fade_samples, len(audio) // 2)
|
||
|
||
if fade_samples <= 0:
|
||
return audio
|
||
|
||
# 淡入
|
||
fade_in = np.linspace(0, 1, fade_samples)
|
||
audio[:fade_samples] = audio[:fade_samples] * fade_in
|
||
|
||
# 淡出
|
||
fade_out = np.linspace(1, 0, fade_samples)
|
||
audio[-fade_samples:] = audio[-fade_samples:] * fade_out
|
||
|
||
return audio
|
||
|
||
|
||
def pcm_to_int16(audio: np.ndarray) -> bytes:
|
||
"""
|
||
将音频转换为 PCM int16 字节流
|
||
|
||
Args:
|
||
audio: 音频数据(浮点型 -1.0 ~ 1.0)
|
||
|
||
Returns:
|
||
PCM int16 little-endian 字节流
|
||
"""
|
||
# 转换为 int16
|
||
audio_int16 = np.clip(audio * 32767, -32768, 32767).astype('<i2')
|
||
return audio_int16.tobytes()
|
||
|
||
|
||
def tts_chunk_to_pcm_s16le(chunk: np.ndarray) -> bytes:
|
||
"""
|
||
TTS 引擎输出块 → 与 WebSocket tts 下行一致的 PCM s16le 小端字节。
|
||
Kokoro 为 float32 ~[-1,1];Piper 多为 int16。
|
||
"""
|
||
a = np.asarray(chunk)
|
||
if a.dtype in (np.float32, np.float64):
|
||
return np.clip(a.astype(np.float64) * 32767.0, -32768, 32767).astype("<i2").tobytes()
|
||
return np.clip(a, -32768, 32767).astype("<i2").tobytes()
|