下面我的项目信息:
项目架构:
A项目(Websocket客户端 / React前端) => B项目(Websocket客户端 / Java后端)=》C项目(Websocket服务端 / Node.js 后端)
项目功能:
A项目有一个开启语音输入的功能,用户开始说话,获取麦克风输入的数据,将获取到的数据传输到B项目,B项目返回一段模拟的识别文字(随机生成)给A项目,(真实逻辑应该是在C项目生成识别文字给到B,B再给我A,这里我偷懒了)。B项目继续将获取到的麦克风数据传输到C项目,C项目将收集到的音频数据存储到内存缓冲区,等A项目停止讲话的时候,C项目会将缓冲区的数据生成为一个 .wav 格式的音频文件,然后再通过 ffmpeg 将 .wav 格式的音频文件转为 .mp3 的音频文件。
上代码:
代码已经上传到了 gitee,
gitee仓库地址:
A项目和C项目:https://gitee.com/tylerzhong/websocket_frontend
B项目:https://gitee.com/tylerzhong/websocket_middleware
A项目的代码:
AudioWebSocket :(项目路径:src\pages\AudioWebSocket\index.jsx)
import { useState, useRef, useEffect } from 'react';const AudioWebSocket = () => {const [resText, setResText] = useState('');const ws = useRef(null);const record = useRef(null);const waveformCanvas = useRef(null);const audioContextRef = useRef(null);// 开始/结束对讲的数据结构const startData = { is_speaking: true, mode: "2pass", wav_name: "h5" };const endData = { is_speaking: false, mode: "2pass", wav_name: "h5" };// 录音处理器类(完整修复:补全 audioData 结构)class Recorder {constructor(stream) {this.sampleBits = 16; // 输出采样数位this.sampleRate = 16000; // 输出采样率// 完整定义 audioData 对象(包含 input 方法)this.audioData = {type: "wav",size: 0, // 录音文件长度buffer: [], // 录音缓存inputSampleRate: 48000, // 输入采样率inputSampleBits: 16, // 输入采样数位outputSampleRate: this.sampleRate, // 输出采样率outputSampleBits: this.sampleBits, // 输出采样数位// 清理缓存clear: function () {this.buffer = [];this.size = 0;},// 向缓存中添加数据input: function (data) {this.buffer.push(new Float32Array(data));this.size += data.length;},// 合并并压缩数据compress: function () {const mergedData = new Float32Array(this.size);let offset = 0;this.buffer.forEach(chunk => {mergedData.set(chunk, offset);offset += chunk.length;});// 按采样率压缩(输入采样率 / 输出采样率)const compressionRatio = this.inputSampleRate / this.outputSampleRate;const compressedLength = Math.floor(mergedData.length / compressionRatio);const compressedData = new Float32Array(compressedLength);for (let i = 0; i < compressedLength; i++) {compressedData[i] = mergedData[i * compressionRatio];}return compressedData;},// 编码为 PCM 格式encodePCM: function () {const pcmData = this.compress();const byteLength = pcmData.length * (this.outputSampleBits / 8);const buffer = new ArrayBuffer(byteLength);const dataView = new DataView(buffer);for (let i = 0; i < pcmData.length; i++) {const value = pcmData[i];const intValue = value < 0 ? Math.max(-1, value) * 0x8000 : // 负数范围:-32768 ~ 0Math.min(1, value) * 0x7FFF; // 正数范围:0 ~ 32767dataView.setInt16(i * 2, intValue, true); // 小端序}return new Blob([dataView], { type: 'audio/wav' });}};this.recording = true; // 录音状态标志// 初始化音频上下文(确保单例)audioContextRef.current = audioContextRef.current || new AudioContext();this.audioInput = audioContextRef.current.createMediaStreamSource(stream);this.recorderNode = audioContextRef.current.createScriptProcessor(4096, 1, 1);// 绑定音频处理回调(使用箭头函数确保 this 指向 Recorder 实例)this.recorderNode.onaudioprocess = (e) => {const inputBuffer = e.inputBuffer.getChannelData(0);this.audioData.input(inputBuffer);this.sendData();this.updateWaveform(inputBuffer);};}// 启动录音(连接音频节点)start() {this.audioInput.connect(this.recorderNode);this.recorderNode.connect(audioContextRef.current.destination);}// 停止录音(断开连接并标记状态)stop() {this.recording = false;this.recorderNode.disconnect();}// 发送数据到 WebSocketsendData() {if (!this.recording) return;const reader = new FileReader();reader.onload = (e) => {const rawData = e.target.result;const byteArray = new Int8Array(rawData);// 分包发送(每包 1024 字节)for (let i = 0; i < byteArray.length; i += 1024) {const chunk = byteArray.slice(i, i + 1024);ws.current.send(chunk);}};// 读取编码后的 PCM 数据reader.readAsArrayBuffer(this.audioData.encodePCM());this.audioData.clear(); // 清理缓存}// 更新音浪效果updateWaveform(inputBuffer) {const canvas = waveformCanvas.current;if (!canvas) return;const ctx = canvas.getContext('2d');const width = canvas.width;const height = canvas.height;ctx.clearRect(0, 0, width, height);ctx.fillStyle = '#106AE8';const numBars = 20;const barWidth = width / (numBars * 3);let x = 15;for (let i = 0; i < numBars; i++) {const sampleIndex = Math.floor(i * (inputBuffer.length / numBars));const barHeight = (Math.abs(inputBuffer[sampleIndex]) * height * 6) / 2;ctx.fillRect(x, height / 2 - barHeight, barWidth, barHeight * 2);x += barWidth + 4;}}}// 初始化 WebSocketconst initWebSocket = () => {ws.current = new WebSocket('ws://localhost:8081/ws/a'); // 替换为实际地址ws.current.binaryType = 'arraybuffer';ws.current.onopen = () => {ws.current.send(JSON.stringify(startData));console.log('WebSocket 连接成功');if (record.current) {record.current.start(); // 启动录音}};ws.current.onmessage = (msg) => {const res = JSON.parse(msg.data);setResText(res.text); // 更新识别文字};ws.current.onerror = (err) => {console.error('WebSocket 错误:', err);};};// 开始对讲const startIntercom = async () => {try {// 申请麦克风权限const mediaStream = await navigator.mediaDevices.getUserMedia({ audio: true });record.current = new Recorder(mediaStream); // 创建 Recorder 实例initWebSocket(); // 初始化 WebSocket} catch (error) {console.error('麦克风权限申请失败:', error);}};// 关闭对讲const endIntercom = () => {if (ws.current) {ws.current.send(JSON.stringify(endData)); // 发送结束标志record.current?.stop(); // 停止录音}};// 组件卸载清理useEffect(() => {return () => {if (ws.current) {ws.current.close(); // 关闭 WebSocket}if (audioContextRef.current) {audioContextRef.current.close(); // 关闭音频上下文}};}, []);return (<div className="mainContent"><button onClick={startIntercom}>开始对讲</button><button onClick={endIntercom}>关闭对讲</button><div>语音识别的文字为:{resText || '--'}</div><canvas ref={waveformCanvas} width="200" height="20" /></div>);
};export default AudioWebSocket;
B项目的代码:
WebSocketConfig:(项目路径:src/main/java/com/tyler/config/WebSocketConfig.java)
package com.tyler.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {// 注册 ServerEndpointExporter,用于扫描所有 @ServerEndpoint 注解的类@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
WebSocketClient:(项目路径:src/main/java/com/tyler/voice/WebSocketClient.java)
package com.tyler.voice;import jakarta.websocket.Endpoint;
import jakarta.websocket.EndpointConfig;
import jakarta.websocket.Session;
import org.glassfish.tyrus.client.ClientManager;import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;public class WebSocketClient {private String cServerUrl;private Session cSession;private CountDownLatch connectLatch = new CountDownLatch(1);public WebSocketClient(String cServerUrl) {this.cServerUrl = cServerUrl;}// 连接public void connect() {new Thread(() -> {ClientManager client = ClientManager.createClient();try {client.connectToServer(new Endpoint() {@Overridepublic void onOpen(Session session, EndpointConfig config){cSession = session;connectLatch.countDown();System.out.println("已连接到 C项目 WebSocket 服务");}}, new URI(cServerUrl));connectLatch.await(); // 等待连接建立} catch (Exception e) {System.out.println("连接 C项目失败:"+e.getMessage());}}).start();}// 发送文本消息(新增方法)public void sendText(String text) {if (cSession != null && cSession.isOpen()) {try {cSession.getBasicRemote().sendText(text); // 直接发送文本} catch (IOException e) {System.out.println("向 C项目发送文本失败:"+e.getMessage());}}}// 保留原发送二进制数据的方法(用于音频数据)public void send(byte[] data) {if (cSession != null && cSession.isOpen()) {try {cSession.getBasicRemote().sendBinary(ByteBuffer.wrap(data));} catch (IOException e) {System.out.println("向 C项目发送数据失败:"+e.getMessage());}}}// 断开与C项目的连接public void disconnect() {if (cSession != null) {try {cSession.close();} catch (IOException e) {System.out.println("断开 C项目连接失败:"+e.getMessage());}}}}
WebSocketServer:(项目路径:src/main/java/com/tyler/voice/WebSocketServer.java)
package com.tyler.voice;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
@ServerEndpoint(value = "/ws/a")
public class WebSocketServer {private static final ConcurrentHashMap<String, Session> aSessions = new ConcurrentHashMap<>();private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private WebSocketClient cClient = new WebSocketClient("ws://localhost:8082/ws/c");@OnOpenpublic void onOpen(Session session) {String sessionId = session.getId();aSessions.put(sessionId, session);System.out.println("A项目连接已建立,SessionID: " + sessionId);cClient.connect(); // 连接 C 项目}@OnMessagepublic void onMessage(Session session, String controlMsg) {System.out.println("收到控制消息:" + controlMsg);try {JSONObject jsonObject = new JSONObject();jsonObject.put("text", "----------------已经收到数据了。。。。。。。。。。。。。。。");session.getBasicRemote().sendText(JSON.toJSONString(jsonObject));} catch (IOException e) {System.out.println("发送数据到A项目失败");}cClient.sendText(controlMsg);}@OnMessagepublic void onMessage(Session session, byte[] audioData) {System.out.println("收到A项目语音数据,长度:" + audioData.length + " bytes");try {JSONObject jsonObject = new JSONObject();Random random = new Random();jsonObject.put("text", random.nextInt(10, 100));session.getBasicRemote().sendText(JSON.toJSONString(jsonObject));} catch (IOException e) {System.out.println("发送数据到A项目失败");}cClient.send(audioData);}@OnClosepublic void onClose(Session session) {String sessionId = session.getId();scheduler.schedule(() -> {cClient.disconnect();aSessions.remove(sessionId);System.out.println("A项目连接已关闭,SessionID: " + sessionId);}, 500, TimeUnit.MICROSECONDS);}@OnErrorpublic void onError(Session session, Throwable error) {// 忽略因连接关闭导致的 IOException (关键)if (!(error instanceof IOException && error.getMessage() != null && error.getMessage().contains("中止了一个已建立的连接"))) {System.out.println("A项目连接异常:"+error.getMessage());}}}
application.yml: (项目路径:src/main/resources/application.yml)
server:port: 8081
C 项目代码:
websocket-node.js:(项目路径:websocket-node.js)
const WebSocket = require('ws');
const fs = require('fs');
const { exec } = require('child_process');
const path = require('path');const wss = new WebSocket.Server({ port: 8082 });
const clientBuffers = new Map(); // 存储 { buffer: Buffer[], isEnd: boolean }// 生成唯一客户端 ID
const generateClientId = () => {return `client-${Date.now()}-${Math.floor(Math.random() * 1000)}`;
};// 计算缓冲区总长度
const sumBufferLength = (clientId) => {return clientBuffers.get(clientId).buffer.reduce((acc, chunk) => acc + chunk.length, 0);
};// FFmpeg 转码(WAV → MP3)
const convertToMp3 = (wavPath, mp3Path, callback) => {const ffmpegCmd = `ffmpeg -i "${wavPath}" -vn -ab 128k -ar 44100 -y "${mp3Path}"`;exec(ffmpegCmd, (error, stdout, stderr) => {if (error) {callback(`转码失败: ${error.message}`);return;}callback(null, mp3Path);});
};wss.on('connection', (ws) => {const clientId = generateClientId();clientBuffers.set(clientId, { buffer: [], isEnd: false });console.log(`新客户端连接: ${clientId}`);ws.on('message', (data) => {let controlMsg = null;const isJson = data.toString().startsWith('{') && data.toString().endsWith('}'); // 快速判断是否可能是 JSONtry {if (isJson) {controlMsg = JSON.parse(data.toString());}} catch (error) {// 故意空着,让 controlMsg 保持 null}if (controlMsg !== null) {// 处理控制消息console.log(`[${clientId}] 收到控制消息:`, controlMsg);if (controlMsg.is_speaking === false) {clientBuffers.get(clientId).isEnd = true;processAudio(clientId);}} else {// 处理音频数据(支持 Buffer/ArrayBuffer/Blob)let bufferData;if (data instanceof Buffer) {bufferData = data;} else if (data instanceof Uint8Array) {bufferData = Buffer.from(data);} else if (data instanceof ArrayBuffer) {bufferData = Buffer.from(data);} else {console.warn(`[${clientId}] 不支持的数据类型:`, typeof data);return;}clientBuffers.get(clientId).buffer.push(bufferData);console.log(`[${clientId}] 收到音频数据块,当前总长度: ${sumBufferLength(clientId)}`);}});ws.on('close', () => {console.log(`[${clientId}] 连接关闭,处理剩余数据`);processAudio(clientId); // 连接关闭时处理残留数据});ws.on('error', (error) => {console.error(`[${clientId}] 连接错误:`, error);processAudio(clientId); // 错误时强制处理});
});// 处理音频数据的核心函数
const processAudio = (clientId) => {const clientData = clientBuffers.get(clientId);if (!clientData || clientData.buffer.length === 0 && !clientData.isEnd) {clientBuffers.delete(clientId);return;}const audioBuffer = Buffer.concat(clientData.buffer);clientBuffers.delete(clientId);if (audioBuffer.length === 0) {console.log(`[${clientId}] 无有效音频数据`);return;}// 保存临时 WAV 文件(补充 WAV 头)const wavBuffer = addWavHeader(audioBuffer);const tempWavPath = path.join(__dirname, 'temp', `${clientId}.wav`);const outputMp3Path = path.join(__dirname, 'output', `${clientId}.mp3`);fs.writeFile(tempWavPath, wavBuffer, (err) => {if (err) {console.error(`[${clientId}] 写入临时文件失败:`, err);return;}convertToMp3(tempWavPath, outputMp3Path, (error, mp3Path) => {if (error) {console.error(`[${clientId}] ${error}`);} else {console.log(`[${clientId}] MP3 保存成功: ${mp3Path}`);}// fs.unlink(tempWavPath, () => {}); // 清理临时文件});});
};// 补充 WAV 头(与 A 项目采样率一致:16000Hz, 16bit, 单声道)
const addWavHeader = (pcmBuffer) => {const sampleRate = 16000; // A 项目 Recorder 中设置的采样率const numChannels = 1; // 单声道const bitDepth = 16; // 16bit 采样// WAV 头总长度固定为 44 字节,后续是 PCM 数据const wavBuffer = Buffer.alloc(44 + pcmBuffer.length);// 写入 RIFF 头(4字节)wavBuffer.write('RIFF', 0);wavBuffer.writeUInt32LE(36 + pcmBuffer.length, 4); // 文件总大小(RIFF 头 + WAV 头 + PCM 数据)wavBuffer.write('WAVE', 8);// 写入 fmt 子块(24字节)wavBuffer.write('fmt ', 12);wavBuffer.writeUInt32LE(16, 16); // fmt 子块大小(固定为 16)wavBuffer.writeUInt16LE(1, 20); // 音频格式(PCM = 1)wavBuffer.writeUInt16LE(numChannels, 22); // 声道数wavBuffer.writeUInt32LE(sampleRate, 24); // 采样率wavBuffer.writeUInt32LE((sampleRate * numChannels * bitDepth) / 8, 28); // 字节率(采样率×声道数×位深/8)wavBuffer.writeUInt16LE((numChannels * bitDepth) / 8, 32); // 块对齐(声道数×位深/8)wavBuffer.writeUInt16LE(bitDepth, 34); // 位深// 写入 data 子块(8字节 + PCM 数据)wavBuffer.write('data', 36);wavBuffer.writeUInt32LE(pcmBuffer.length, 40); // PCM 数据大小// 写入 PCM 数据(关键修复:逐个字节复制)pcmBuffer.copy(wavBuffer, 44); // 将 PCM 数据复制到 WAV 缓冲区的 44 字节之后return wavBuffer;
};// 初始化临时目录
fs.mkdirSync(path.join(__dirname, 'temp'), { recursive: true });
fs.mkdirSync(path.join(__dirname, 'output'), { recursive: true });
console.log('C项目启动,端口: 8082');