在项目中遇到了将Go侧的实时音频输入流式传输给python侧,在python侧使用ASR模型进行语音转文字,再将文本结果传回Go侧的需求。相比直接使用TCP连接,WebSocket协议更为可靠稳定,而又不像传统http请求/响应模型需要频繁请求,性能更佳,且在python和Go都有完善易用的库实现,很适合流式传输、事件总线等场景。
环境
服务器:python 3.12.12 + fastapi
客户端:Go 1.25.0 + “github.com/coder/websocket”
0. WebSocket协议
WebSocket的生命周期:
- 连接建立:客户端先发送WebSocket握手请求,服务器响应握手请求,即建立起了WebSocket连接
- 连接开放:WebSocket连接建立后,服务器和客户端均可以随时互相发送数据
- 连接关闭:由服务器或客户端发送特定的关闭帧,连接关闭后再进行读写将会产生错误
WebSocket并不总是被正常关闭,连接超时等情况均能导致异常关闭,若不正确处理便会引起异常并导致程序退出。使用WebSocket的难点就在于维护连接稳定。
1. WebSocket 的基本使用
建立连接
from fastapi import FastAPI, WebSocket
app = FastAPI()
# 在客户端向/sample发送握手请求时异步调用
@app.websocket("/sample")
async def sample(ws: WebSocket):
try:
await ws.accept() # 服务器响应握手请求,建立WebSocket连接
# ...
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=PORT)
func initws(ctx context.Context,port int) (*websocket.Conn, error) {
conn, _, err := websocket.Dial(ctx, "ws://127.0.0.1:"+strconv.Itoa(port)+"/sample", nil) // 客户端发送握手请求
if err != nil {
return nil, err
}
log.Printf("ASR websocket connected")
return conn, nil
}
连接开放
@app.websocket("/sample")
async def sample(ws: WebSocket):
try:
await ws.accept()
while True: # 消息循环
msg = await ws.receive()
# fastapi中,receive()方法获得一个字典,包含"type" "text"或"byte"等key
if msg.get("type") == "disconnect": # 接收到关闭帧,退出消息循环
break
data = msg.get("bytes")
if data is None:
continue
# 接收到二进制数据,以下是业务逻辑。
raw_buffer.extend(data)
usable_len = len(raw_buffer) - (len(raw_buffer) % frame_bytes)
if usable_len == 0:
continue
pcm = np.frombuffer(raw_buffer[:usable_len], dtype=np.int16)
del raw_buffer[:usable_len]
samples = pcm.astype(np.float32) / 32768.0
audio_buffer = np.concatenate((audio_buffer, samples))
cut_size = config.CHUNK_SIZE * frame_bytes
while len(audio_buffer) >= cut_size:
chunk = audio_buffer[:cut_size]
audio_buffer = audio_buffer[cut_size:]
for speech_dict, speech_samples in vad_iterator(chunk):
if "start" in speech_dict:
sensevoice_model.reset()
last_text = ""
has_text = False
is_last = "end" in speech_dict
for res in sensevoice_model.streaming_inference(
speech_samples, is_last
):
text = res.get("text", "")
if not text:
continue
has_text = True
last_text = text
await ws.send_json({"text": text, "is_final": False})
# 异步发送ASR结果
if is_last:
if has_text:
await ws.send_json(
{"text": last_text, "is_final": True}
)
has_text = False
last_text = ""
# ... 异常处理
// 流式发送音频数据
go func() {
for {
select {
case <-asrService.ctx.Done():
return
case audioData, ok := <-asrService.AudioCh:
if !ok {
return
}
if len(audioData) == 0 {
continue
}
wctx, cancel := context.WithTimeout(asrService.ctx, 2*time.Second)
err := asrService.conn.Write(wctx, websocket.MessageBinary, audioData)
cancel()
if err != nil {
log.Printf("ASR WebSocket write err: %v", err)
}
}
}
}
// goroutine实时接收ASR结果
go func() {
var reconnectDelay time.Duration
for {
select {
case <-asrService.ctx.Done():
return
default:
}
wctx, cancel := context.WithTimeout(asrService.ctx, 2*time.Second)
mt, msg, err := asrService.conn.Read(wctx) // Read()方法读取一个数据帧
cancel()
if err != nil {
log.Printf("ASR WebSocket read err: %v", err)
return
}
if mt == websocket.MessageText {
var res asrResult
err := json.Unmarshal(msg, &res)
if err != nil {
log.Printf("ASR result unmarshal err: %v", err)
continue
}
select {
case asrService.ResultCh <- res:
default:
}
}
}
连接关闭
if msg.get("type") == "disconnect": # 接收到关闭帧,退出消息循环
break
err = asrService.conn.Close(websocket.StatusNormalClosure, "closed") // 参数为状态码和原因字符串,发送关闭帧
2. 维护WebSocket连接
连接关闭后,再进行读写便会引发exception和error,所以创建WebSocket连接时就应确保服务器进程已完全启动,并在程序运行时持续维护WebSocket连接。
通过监控python端日志确保服务启动
// goroutine转发python端输出,并监控特定日志,通过设置标志位告知服务启动情况
go func() {
scanner := bufio.NewScanner(errPipe)
hasSetServerReady := false
for scanner.Scan() {
if !hasSetServerReady && strings.Contains(scanner.Text(), fmt.Sprintf("Uvicorn running on http://127.0.0.1:%d", port)) {
setServerReady("asr")
hasSetServerReady = true
}
fmt.Println("[python][ASR][stderr]", scanner.Text())
}
}()
心跳机制
python端使用的fastapi库和Go端使用的”github.com/coder/websocket”接收ping帧时均会自动响应并发送pong帧,故只需在Go端定时调用Ping方法。Ping方法内部会处理pong帧,Ping方法正确返回便表示已收到对应 Pong。但需保证此时有goroutine持续调用 Read(),否则控制帧无法被读取处理。
func startHeartbeat(conn *websocket.Conn, interval time.Duration, done chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := conn.Ping(ctx)
cancel()
if err != nil {
log.Println("heartbeat failed:", err)
close(done)
return
}
case <-done:
return
}
}
}
go startHeartbeat(conn, 5*time.Second, done)
重连机制
WebSocket连接可能因为各种原因意外断开,导致心跳协程退出/Go端读写方法返回err != nil/python端抛出exception,程序应进行处理以尽快恢复服务。
python端做好异常处理,清理资源后退出异步函数即可。
但在Go端实现重连机制时出现了问题:多个goroutine均在使用一个websocket进行读写,若连接断开,多个goroutine都会出错并调用重连函数,导致反复断开、重连。因此我正在尝试将WebSocket连接管理逻辑解耦,由单一控制点维护连接,这是AI给出的示例。
func (m *Manager) connectLoop() {
backoff := time.Second
for {
select {
case <-m.ctx.Done():
return
default:
}
log.Println("connecting to", m.url)
conn, _, err := websocket.Dial(m.ctx, m.url, nil)
if err != nil {
log.Println("dial failed:", err)
time.Sleep(backoff)
backoff = min(backoff*2, 30*time.Second)
continue
}
log.Println("connected")
m.setConn(conn)
backoff = time.Second
errCh := make(chan error, 2)
go func() {
errCh <- m.readLoop(conn)
}()
go func() {
errCh <- m.heartbeatLoop(conn)
}()
err := <-errCh
log.Println("connection lost:", err)
m.clearConn()
conn.Close(websocket.StatusInternalError, "reconnect")
}
}



2/100了o(////▽////)q
??
哦内盖…٩(◕‿◕。)۶ (☆ω☆)
可以手把手教我吗
🥺🥺