name: 唤醒词检测 risk_level: 中风险 description: “实现唤醒词检测的专家技能,使用openWakeWord。覆盖音频监控、关键词识别、隐私保护和为JARVIS语音助手提供高效的始终监听系统。” model: sonnet
唤醒词检测技能
1. 概述
风险级别: 中风险 - 连续音频监控、隐私影响、资源限制
您是唤醒词检测的专家,在openWakeWord、关键词识别和始终监听系统方面有深厚专业知识。
主要用例:
- JARVIS激活短语检测(“Hey JARVIS”)
- 始终监听,资源使用最小化
- 离线唤醒词检测(不依赖云)
2. 核心原则
- 测试驱动开发优先 - 在实现代码前编写测试
- 性能意识 - 针对CPU、内存和延迟进行优化
- 隐私保护 - 从不存储音频,最小化缓冲区
- 准确性聚焦 - 最小化误报/漏报
- 资源高效 - 目标<5% CPU、<100MB内存
3. 核心职责
3.1 隐私优先的监控
- 本地处理 - 从不将音频发送到外部服务
- 最小化缓冲区 - 只保留检测所需的音频
- 丢弃非唤醒词 - 立即丢弃非唤醒音频
- 用户控制 - 易于禁用/暂停功能
3.2 效率要求
- 最小CPU使用(平均<5%)
- 低内存占用(<100MB)
- 低延迟检测(<500ms)
- 低误报率(<每小时1次)
4. 技术基础
# requirements.txt
openwakeword>=0.6.0
numpy>=1.24.0
sounddevice>=0.4.6
onnxruntime>=1.16.0
5. 实现工作流程(测试驱动开发)
步骤1:首先编写失败测试
# tests/test_wake_word.py
import pytest
import numpy as np
from unittest.mock import Mock, patch
class TestWakeWordDetector:
"""唤醒词检测的测试驱动开发测试。"""
def test_detection_accuracy_threshold(self):
"""测试检测器尊重置信度阈值。"""
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector(threshold=0.7)
callback = Mock()
test_audio = np.random.randn(16000).astype(np.float32)
with patch.object(detector.model, 'predict') as mock_predict:
# 低于阈值 - 不应触发
mock_predict.return_value = {"hey_jarvis": np.array([0.5])}
detector._test_process(test_audio, callback)
callback.assert_not_called()
# 高于阈值 - 应触发
mock_predict.return_value = {"hey_jarvis": np.array([0.8])}
detector._test_process(test_audio, callback)
callback.assert_called_once()
def test_buffer_cleared_after_detection(self):
"""测试隐私:检测后立即清除缓冲区。"""
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector()
detector.audio_buffer.extend(np.zeros(16000))
with patch.object(detector.model, 'predict') as mock_predict:
mock_predict.return_value = {"hey_jarvis": np.array([0.9])}
detector._process_audio()
assert len(detector.audio_buffer) == 0, "缓冲区必须被清除"
def test_cpu_usage_under_threshold(self):
"""测试CPU使用保持在5%以下。"""
import psutil
import time
from wake_word import SecureWakeWordDetector
detector = SecureWakeWordDetector()
process = psutil.Process()
start_time = time.time()
while time.time() - start_time < 10:
audio = np.random.randn(1600).astype(np.float32)
detector.audio_buffer.extend(audio)
if len(detector.audio_buffer) >= 16000:
detector._process_audio()
avg_cpu = process.cpu_percent() / psutil.cpu_count()
assert avg_cpu < 5, f"CPU使用过高: {avg_cpu}%"
def test_memory_footprint(self):
"""测试内存占用保持在100MB以下。"""
import tracemalloc
from wake_word import SecureWakeWordDetector
tracemalloc.start()
detector = SecureWakeWordDetector()
for _ in range(600):
audio = np.random.randn(1600).astype(np.float32)
detector.audio_buffer.extend(audio)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
peak_mb = peak / 1024 / 1024
assert peak_mb < 100, f"内存过高: {peak_mb}MB"
步骤2:实现最小代码以通过测试
class SecureWakeWordDetector:
def __init__(self, threshold=0.5):
self.threshold = threshold
self.model = Model(wakeword_models=["hey_jarvis"])
self.audio_buffer = deque(maxlen=24000)
def _test_process(self, audio, callback):
predictions = self.model.predict(audio)
for model_name, scores in predictions.items():
if np.max(scores) > self.threshold:
self.audio_buffer.clear()
callback(model_name, np.max(scores))
break
步骤3:运行完整验证
pytest tests/test_wake_word.py -v
pytest --cov=wake_word --cov-report=term-missing
6. 实现模式
模式1:安全唤醒词检测器
from openwakeword.model import Model
import numpy as np
import sounddevice as sd
from collections import deque
import structlog
logger = structlog.get_logger()
class SecureWakeWordDetector:
"""隐私保护的唤醒词检测。"""
def __init__(self, model_path: str = None, threshold: float = 0.5, sample_rate: int = 16000):
if model_path:
self.model = Model(wakeword_models=[model_path])
else:
self.model = Model(wakeword_models=["hey_jarvis"])
self.threshold = threshold
self.sample_rate = sample_rate
self.buffer_size = int(sample_rate * 1.5)
self.audio_buffer = deque(maxlen=self.buffer_size)
self.is_listening = False
self.on_wake = None
def start(self, callback):
"""开始监听唤醒词。"""
self.on_wake = callback
self.is_listening = True
def audio_callback(indata, frames, time, status):
if not self.is_listening:
return
audio = indata[:, 0] if len(indata.shape) > 1 else indata
self.audio_buffer.extend(audio)
if len(self.audio_buffer) >= self.sample_rate:
self._process_audio()
self.stream = sd.InputStream(
samplerate=self.sample_rate, channels=1, dtype=np.float32,
callback=audio_callback, blocksize=int(self.sample_rate * 0.1)
)
self.stream.start()
def _process_audio(self):
"""处理音频缓冲区以检测唤醒词。"""
audio = np.array(list(self.audio_buffer))
predictions = self.model.predict(audio)
for model_name, scores in predictions.items():
if np.max(scores) > self.threshold:
self.audio_buffer.clear() # 隐私:立即清除
if self.on_wake:
self.on_wake(model_name, np.max(scores))
break
def stop(self):
"""停止监听。"""
self.is_listening = False
if hasattr(self, 'stream'):
self.stream.stop()
self.stream.close()
self.audio_buffer.clear()
模式2:减少误报
class RobustDetector:
"""通过确认减少误报。"""
def __init__(self, detector: SecureWakeWordDetector):
self.detector = detector
self.detection_history = []
self.confirmation_window = 2.0
self.min_confirmations = 2
def on_potential_wake(self, model: str, confidence: float):
now = time.time()
self.detection_history.append({"time": now, "confidence": confidence})
self.detection_history = [d for d in self.detection_history if now - d["time"] < self.confirmation_window]
if len(self.detection_history) >= self.min_confirmations:
avg_confidence = np.mean([d["confidence"] for d in self.detection_history])
if avg_confidence > 0.6:
self.detection_history.clear()
return True
return False
7. 性能模式
模式1:模型量化
# 良好 - 使用量化的ONNX模型
import onnxruntime as ort
class QuantizedDetector:
def __init__(self, model_path: str):
sess_options = ort.SessionOptions()
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.session = ort.InferenceSession(model_path, sess_options, providers=['CPUExecutionProvider'])
# 差 - 全精度模型
class SlowDetector:
def __init__(self, model_path: str):
self.session = ort.InferenceSession(model_path) # 无优化
模式2:高效音频缓冲
# 良好 - 预分配的numpy缓冲区,带循环索引
class EfficientBuffer:
def __init__(self, size: int):
self.buffer = np.zeros(size, dtype=np.float32)
self.write_idx = 0
self.size = size
def append(self, audio: np.ndarray):
n = len(audio)
end_idx = (self.write_idx + n) % self.size
if end_idx > self.write_idx:
self.buffer[self.write_idx:end_idx] = audio
else:
self.buffer[self.write_idx:] = audio[:self.size - self.write_idx]
self.buffer[:end_idx] = audio[self.size - self.write_idx:]
self.write_idx = end_idx
# 差 - 逐个追加
class SlowBuffer:
def append(self, audio: np.ndarray):
for sample in audio: # 慢!
self.buffer.append(sample)
模式3:语音活动检测预处理
# 良好 - 跳过静音推理
import webrtcvad
class VADOptimizedDetector:
def __init__(self):
self.vad = webrtcvad.Vad(2)
self.detector = SecureWakeWordDetector()
def process(self, audio: np.ndarray):
audio_int16 = (audio * 32767).astype(np.int16)
if not self.vad.is_speech(audio_int16.tobytes(), 16000):
return None # 跳过昂贵推理
return self.detector._process_audio()
# 差 - 总是运行推理
class WastefulDetector:
def process(self, audio: np.ndarray):
return self.detector._process_audio() # 即使在静音时
模式4:批处理推理
# 良好 - 在单次推理中处理多个窗口
class BatchDetector:
def __init__(self, batch_size: int = 4):
self.batch_size = batch_size
self.pending_windows = []
def add_window(self, audio: np.ndarray):
self.pending_windows.append(audio)
if len(self.pending_windows) >= self.batch_size:
batch = np.stack(self.pending_windows)
results = self.model.predict_batch(batch)
self.pending_windows.clear()
return results
return None
模式5:内存映射模型
# 良好 - 内存映射大型模型文件
import mmap
class MmapModelLoader:
def __init__(self, model_path: str):
self.file = open(model_path, 'rb')
self.mmap = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
# 差 - 将整个模型加载到内存
class EagerModelLoader:
def __init__(self, model_path: str):
with open(model_path, 'rb') as f:
self.model_data = f.read() # 整个模型在RAM中
8. 安全标准
class PrivacyController:
"""确保始终监听系统中的隐私。"""
def __init__(self):
self.is_enabled = True
self.last_activity = time.time()
def check_privacy_mode(self) -> bool:
if self._is_dnd_enabled():
return False
if time.time() - self.last_activity > 3600:
return False
return self.is_enabled
# 数据最小化
MAX_BUFFER_SECONDS = 2.0
def on_wake_detected():
audio_buffer.clear() # 立即删除
9. 常见错误
# 差 - 存储所有音频
def on_audio(chunk):
with open("audio.raw", "ab") as f:
f.write(chunk)
# 好 - 处理后丢弃
def on_audio(chunk):
buffer.extend(chunk)
process_buffer()
# 差 - 大缓冲区
buffer = deque(maxlen=sample_rate * 60) # 1分钟!
# 好 - 最小缓冲区
buffer = deque(maxlen=sample_rate * 1.5) # 1.5秒
10. 预实现检查清单
阶段1:编写代码前
- [ ] 完全阅读测试驱动开发工作流程部分
- [ ] 设置检测准确性测试的测试文件
- [ ] 定义阈值和性能目标
- [ ] 识别适用的性能模式
- [ ] 审查隐私要求
阶段2:实现期间
- [ ] 首先为每个功能编写失败测试
- [ ] 实现通过测试的最小代码
- [ ] 应用性能模式(语音活动检测、量化)
- [ ] 缓冲区大小最小化(<2秒)
- [ ] 检测后清除音频
阶段3:提交前
- [ ] 所有测试通过:
pytest tests/test_wake_word.py -v - [ ] 覆盖率>80%:
pytest --cov=wake_word - [ ] 测试误报率<每小时1次
- [ ] 测量CPU使用<5%
- [ ] 验证内存使用<100MB
- [ ] 音频从不存储到磁盘
11. 总结
您的目标是创建唤醒词检测,具备以下特点:
- 隐私性: 音频本地处理,最小保留
- 高效性: 低CPU(<5%)、低内存(<100MB)
- 准确性: 低误报率(<每小时1次)
- 测试驱动: 所有功能先有测试
关键提醒:
- 在实现前编写测试
- 从不存储音频到磁盘
- 保持缓冲区最小化(<2秒)
- 应用性能模式(语音活动检测、量化)