Half-Duplex 模式(VAD 半双工语音对话)

概述

Half-Duplex 模式实现免提式的自动轮流语音对话。服务端运行 SileroVAD(语音活动检测)来自动检测用户何时开始说话、何时说完。用户说完后,模型流式生成回复。播放完毕后系统恢复监听——类似电话式的轮流对话。

与 Chat 模式不同,Half-Duplex 是有状态的长时会话。GPU Worker 在整个会话期间被独占(默认 3 分钟超时)。KV Cache 在多轮对话间持续积累,模型可以利用完整的对话历史上下文,无需重新编码之前的轮次。

能力:语音输入 → 文本 + 语音输出,流式输出,多轮上下文积累,KV Cache 持久化,独占 GPU Worker。

生命周期

sequenceDiagram
    participant C as Client
    participant S as Server

    C->>S: WS /ws/half_duplex/{session_id}
    S-->>C: queued (position, ETA)
    S-->>C: queue_done
    C->>S: prepare (system_prompt + config)
    S-->>C: prepared (session_id, timeout_s)

    loop 语音对话轮次
        C->>S: audio_chunk(持续发送,每 0.5s)
        S-->>C: vad_state: speaking=true
        Note over C,S: 用户正在说话...
        S-->>C: vad_state: speaking=false
        S-->>C: generating (speech_duration_ms)
        loop 流式回复
            S-->>C: chunk (text_delta + audio_data)
        end
        S-->>C: turn_done (turn_index, 完整文本)
        Note over C,S: 客户端播放音频,播放完毕后恢复发送 audio_chunk
    end

    alt 客户端主动停止
        C->>S: stop
        S-->>C: stopped
    else 会话超时
        S-->>C: timeout (elapsed_s)
    end
    Note over C,S: GPU 释放,会话录制保存

阶段 1 — 连接与排队:客户端连接 wss://host/ws/half_duplex/{session_id},使用唯一的 session ID。服务端将请求放入 FIFO 队列。客户端收到 queued,包含排队位置和预计等待时间。当 GPU Worker 可用时,客户端收到 queue_done

阶段 2 — 准备:客户端发送 prepare,包含系统提示、VAD 参数、生成配置、TTS 设置,以及可选的参考音频用于语音克隆。服务端执行初始化:(1) 加载 SileroVAD ONNX 模型,(2) 将系统提示 prefill 到 KV Cache,(3) 使用参考音频初始化 TTS,(4) 启动会话录制器。客户端收到 prepared,包含分配的 session_idtimeout_srecording_session_id

阶段 3 — 监听循环:客户端开始持续发送 audio_chunk 消息(每 0.5 秒一次,float32 PCM 16kHz)。服务端将每个 chunk 送入 StreamingVAD。当检测到语音时,服务端发送 vad_state: {speaking: true}。客户端应显示"正在听"的指示。

冷启动保护prepared 后的前 0.5 秒,所有 VAD 结果被忽略,以过滤麦克风初始化噪音。

阶段 4 — 语音结束与生成:当 VAD 检测到持续静音(可通过 min_silence_duration_ms 配置,默认 800ms),累积的语音片段被定型。服务端先发送 vad_state: {speaking: false},紧接着发送 generating(包含 speech_duration_ms)。语音片段被编码为音频内容并 prefill 到 KV Cache,然后开始流式生成。每个生成的 token 产生一条 chunk 消息,包含 text_deltaaudio_data

阶段 5 — 轮次完成:生成结束时,服务端发送 turn_done,包含 turn_index(从 0 开始的计数器)和本轮完整 text。客户端应播放所有已接收的音频。播放期间,客户端必须停止发送 audio_chunk,以防止回声反馈(否则模型会听到自己的声音)。播放完毕后,客户端等待额外约 800ms 缓冲,然后恢复发送 audio_chunk 开始下一轮。

阶段 6 — 终止:会话以三种方式之一结束: - 客户端停止:客户端发送 stop。服务端发送 stopped 并释放 GPU。 - 超时:如果 timeout_s 秒内(默认 180)未收到 audio_chunk,服务端发送 timeout(包含 elapsed_s)并释放 GPU。 - 外部停止:通过 HTTP POST /api/half_duplex/stop 携带 session_id 可强制中断正在进行的生成。

终止后,会话录制被保存,可用于回放。

WebSocket — wss://host/ws/half_duplex/{session_id}

客户端 → 服务端

消息类型 关键字段 说明
prepare system_prompt, config, ref_audio_base64, system_content 初始化会话;必须在 queue_done 后发送的第一条消息
audio_chunk audio_base64 发送麦克风音频(float32 PCM 16kHz,每次约 0.5s)。监听阶段必须持续发送。AI 音频播放期间必须停止发送以防回声
stop 优雅停止会话并释放 GPU

prepare 示例

{
  "type": "prepare",
  "system_prompt": "You are a helpful assistant.",
  "config": {
    "vad": {
      "threshold": 0.8,
      "min_speech_duration_ms": 128,
      "min_silence_duration_ms": 800,
      "speech_pad_ms": 30
    },
    "generation": {
      "max_new_tokens": 256,
      "length_penalty": 1.1,
      "temperature": 0.7
    },
    "tts": {
      "enabled": true
    },
    "session": {
      "timeout_s": 180
    }
  },
  "ref_audio_base64": "<base64 参考音频>"
}

config 字段

分类 字段 默认值 说明
vad threshold 0.8 语音概率阈值。SileroVAD 以 1024 样本为窗口滑动检测,输出每个窗口的语音概率。概率 >= 阈值标记为"开始说话"。值越高误触越少,但可能错过轻声说话
vad min_speech_duration_ms 128 有效语音的最短时长。短于此值的片段被作为噪音丢弃
vad min_silence_duration_ms 800 确认说完所需的持续静音时长。值越小轮次切换越快,但可能在句中停顿时误判为说完
vad speech_pad_ms 30 语音片段两侧的填充时长,避免裁切词语边界
generation max_new_tokens 256 每轮最大生成 token 数
generation length_penalty 1.1 长度惩罚系数(> 1.0 鼓励更长回复)
generation temperature 0.7 采样温度
tts enabled true 启用语音回复。设为 false 时仅生成文本
session timeout_s 180 会话超时时间(秒)。每次收到 audio_chunk 时计时器重置

audio_chunk 示例

{
  "type": "audio_chunk",
  "audio_base64": "<base64 PCM float32, 16kHz, ~0.5s>"
}

服务端 → 客户端

消息在每轮内按严格的生命周期顺序到达:

消息类型 关键字段 生命周期阶段 说明
queued position, estimated_wait_s 连接 请求已入队
queue_done 连接 已出队,GPU Worker 已分配。客户端应发送 prepare
prepared session_id, timeout_s, recording_session_id 准备 会话已初始化。系统提示已 prefill,VAD 就绪,TTS 已加载。客户端应开始发送 audio_chunk
vad_state speaking (bool) 监听 VAD 状态转换。true = 检测到语音(用户开始说话)。false = 语音结束(用户停止说话)
generating speech_duration_ms 轮次开始 服务端正在处理语音片段并启动生成
chunk text_delta, audio_data 生成中 一个流式 chunk。text_delta 为增量文本;audio_data 为对应的 24kHz 音频段。客户端应按顺序缓冲并播放音频
turn_done turn_index, text 轮次结束 本轮生成完成。text 为本轮完整回复文本。客户端应播放完缓冲音频,然后延迟约 800ms 后恢复发送 audio_chunk
timeout elapsed_s 终止 会话因无活动超时。连接将关闭
error error 任意 发生错误。连接将关闭

turn_done 示例

{
  "type": "turn_done",
  "turn_index": 2,
  "text": "好的,我可以帮你解答这个问题。"
}

REST — POST /api/half_duplex/stop

从 WebSocket 连接外部强制停止正在进行的半双工生成。适用于在 UI 中实现独立于音频流的"停止"按钮。

请求体

{"session_id": "stream_abc123"}

示例:完整生命周期

JavaScript

const sessionId = 'hdx_' + Math.random().toString(36).slice(2, 10);
const ws = new WebSocket(`wss://${location.host}/ws/half_duplex/${sessionId}`);
let aiSpeaking = false;
let audioContext, captureNode;

// -- 声音克隆参考音频 (base64 PCM float32, 16kHz) --
const refAudioBase64 = getRefAudioBase64();

ws.onopen = () => console.log('已连接,等待排队...');

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  switch (msg.type) {
    case 'queued':
      console.log(`排队位置: #${msg.position},预计等待: ${msg.estimated_wait_s}s`);
      break;

    case 'queue_done':
      // GPU 已分配——发送 prepare,通过 system_content 嵌入参考音频。
      // system_content 遵循模型最佳实践格式:[text, audio, text]。
      // 其中的 audio 项用于 LLM 上下文嵌入和 TTS 声音克隆。
      ws.send(JSON.stringify({
        type: 'prepare',
        system_content: [
          { type: 'text', text: '模仿音频样本的音色并生成新的内容。' },
          { type: 'audio', data: refAudioBase64 },         // 参考音色
          { type: 'text', text: '你是一个有帮助的助手,请自然地回复。' },
        ],
        config: {
          vad: { threshold: 0.8, min_silence_duration_ms: 800 },
          generation: { max_new_tokens: 256, temperature: 0.7 },
          tts: { enabled: true },
          session: { timeout_s: 180 },
        },
      }));
      break;

    case 'prepared':
      console.log(`会话就绪(超时: ${msg.timeout_s}s)`);
      startMicCapture();  // 开始发送 audio_chunk
      break;

    case 'vad_state':
      // 服务端 VAD 检测到语音开始/结束
      console.log(msg.speaking ? '用户正在说话...' : '用户停止说话');
      break;

    case 'generating':
      // 服务端正在处理用户语音并开始生成回复,
      // 此时暂停发送麦克风音频以防止回声反馈。
      aiSpeaking = true;
      console.log(`开始生成(语音时长: ${msg.speech_duration_ms}ms)`);
      break;

    case 'chunk':
      // 流式 token:增量文本和/或音频片段
      if (msg.text_delta) process.stdout.write(msg.text_delta);
      if (msg.audio_data) playAudio(msg.audio_data);  // PCM float32, 24kHz
      break;

    case 'turn_done':
      // 本轮完成 — 等播放完毕 + 800ms 缓冲后恢复麦克风,
      // 避免采集到 AI 自身的音频输出。
      console.log(`\n轮次 ${msg.turn_index} 完成: ${msg.text}`);
      setTimeout(() => { aiSpeaking = false; }, getPlaybackRemaining() + 800);
      break;

    case 'timeout':
      console.log(`会话超时(${msg.elapsed_s}s)`);
      break;
    case 'error':
      console.error('错误:', msg.error);
      break;
  }
};

async function startMicCapture() {
  const stream = await navigator.mediaDevices.getUserMedia({ audio: { sampleRate: 16000 } });
  audioContext = new AudioContext({ sampleRate: 16000 });
  await audioContext.audioWorklet.addModule('capture-processor.js');
  const source = audioContext.createMediaStreamSource(stream);
  captureNode = new AudioWorkletNode(audioContext, 'capture-processor');
  source.connect(captureNode);

  // AudioWorklet 是事件驱动的,不是定时器:
  // 音频渲染线程实时累积麦克风采样,缓冲区满约 0.5 秒时自动触发 'chunk'。
  // 无需 sleep 或轮询。
  captureNode.port.onmessage = (e) => {
    if (e.data.type === 'chunk' && !aiSpeaking && ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({
        type: 'audio_chunk',
        audio_base64: float32ToBase64(e.data.audio),
      }));
    }
  };
}

function stopSession() {
  ws.send(JSON.stringify({ type: 'stop' }));
  ws.close();
}

Python

import asyncio, json, base64
import numpy as np
import websockets

def load_ref_audio(path: str) -> str:
    """加载 WAV 文件,返回 base64 编码的 PCM float32 (16kHz)。"""
    import soundfile as sf
    audio, _ = sf.read(path, dtype="float32", samplerate=16000)
    return base64.b64encode(audio.tobytes()).decode()

def audio_file_to_chunks(path, chunk_duration=0.5, sr=16000):
    """读取音频文件,切分为 0.5 秒的 float32 块并返回 base64。"""
    import soundfile as sf
    audio, _ = sf.read(path, dtype="float32", samplerate=sr)
    chunk_size = int(sr * chunk_duration)
    for i in range(0, len(audio), chunk_size):
        yield base64.b64encode(audio[i:i + chunk_size].tobytes()).decode()

async def half_duplex_session(
    audio_path: str,
    server="wss://localhost:8006",
    ref_audio_path: str | None = "ref.wav",
):
    session_id = f"hdx_{id(object()):x}"
    url = f"{server}/ws/half_duplex/{session_id}"

    async with websockets.connect(url) as ws:
        # 1. 等待排队分配
        while True:
            msg = json.loads(await ws.recv())
            if msg["type"] == "queue_done":
                break
            print(f"排队中 #{msg.get('position')}")

        # 2. 发送 prepare — system_content 嵌入参考音频用于声音克隆。
        #    格式遵循模型最佳实践:[text, audio, text]。
        ref_b64 = load_ref_audio(ref_audio_path) if ref_audio_path else None
        system_content = [
            {"type": "text", "text": "模仿音频样本的音色并生成新的内容。"},
            {"type": "audio", "data": ref_b64},         # 参考音色
            {"type": "text", "text": "你是一个有帮助的助手,请自然地回复。"},
        ] if ref_b64 else None

        prepare_msg = {
            "type": "prepare",
            "config": {
                "vad": {"threshold": 0.8, "min_silence_duration_ms": 800},
                "generation": {"max_new_tokens": 256, "temperature": 0.7},
                "tts": {"enabled": True},
                "session": {"timeout_s": 60},
            },
        }
        if system_content:
            prepare_msg["system_content"] = system_content
        await ws.send(json.dumps(prepare_msg))

        msg = json.loads(await ws.recv())
        assert msg["type"] == "prepared"
        print(f"会话就绪: {msg['session_id']}")

        # 3. 并发发送音频和接收消息
        async def send_audio():
            for chunk_b64 in audio_file_to_chunks(audio_path):
                await ws.send(json.dumps({
                    "type": "audio_chunk",
                    "audio_base64": chunk_b64,
                }))
                # 模拟实时麦克风节奏:浏览器端 AudioWorklet 由音频渲染线程
                # 实时驱动,缓冲区满时自动触发——无需 sleep。
                # 这里从文件读取,需要 sleep 来匹配实时节奏。
                await asyncio.sleep(0.5)
            # 等待服务端完成最后一轮生成
            await asyncio.sleep(5)
            await ws.send(json.dumps({"type": "stop"}))

        async def recv_messages():
            async for raw in ws:
                msg = json.loads(raw)
                if msg["type"] == "vad_state":
                    print("正在说话..." if msg["speaking"] else "停止说话")
                elif msg["type"] == "generating":
                    print(f"开始生成(语音时长 {msg['speech_duration_ms']}ms)")
                elif msg["type"] == "chunk":
                    if msg.get("text_delta"):
                        print(msg["text_delta"], end="", flush=True)
                elif msg["type"] == "turn_done":
                    print(f"\n--- 轮次 {msg['turn_index']} 完成 ---")
                elif msg["type"] in ("stopped", "timeout"):
                    break

        await asyncio.gather(send_audio(), recv_messages())

asyncio.run(half_duplex_session("test_audio.wav"))

Processor 方法链

Half-Duplex 会话的内部处理流水线:

阶段 方法 说明
初始化 UnifiedProcessor.set_half_duplex_mode() 切换到 Half-Duplex 模式(< 0.1ms),返回 HalfDuplexView
初始化 HalfDuplexView.init_ref_audio(path)init_ref_audio_from_data(ndarray) 加载参考音频用于 TTS 语音克隆
准备 HalfDuplexView.prefill(request) 将系统提示 prefill 到 KV Cache;创建回滚快照
每轮 HalfDuplexView.prefill(request) 将用户语音片段 prefill 到 KV Cache
每轮 HalfDuplexView.generate(session_id, ...) 流式生成,yield StreamingChunk(文本 + 音频)
恢复 HalfDuplexView.can_rollback()rollback() 检查是否可回滚,然后恢复 KV Cache 到上一快照(如生成出错时)
恢复 HalfDuplexView.clear_rollback_point() 成功完成一轮后丢弃快照