WebSocket与流式网络通信

在项目中遇到了将Go侧的实时音频输入流式传输给python侧,在python侧使用ASR模型进行语音转文字,再将文本结果传回Go侧的需求。相比直接使用TCP连接,WebSocket协议更为可靠稳定,而又不像传统http请求/响应模型需要频繁请求,性能更佳,且在python和Go都有完善易用的库实现,很适合流式传输、事件总线等场景。

环境

服务器:python 3.12.12 + fastapi
客户端:Go 1.25.0 + “github.com/coder/websocket”

0. WebSocket协议

WebSocket的生命周期:

  1. 连接建立:客户端先发送WebSocket握手请求,服务器响应握手请求,即建立起了WebSocket连接
  2. 连接开放:WebSocket连接建立后,服务器和客户端均可以随时互相发送数据
  3. 连接关闭:由服务器或客户端发送特定的关闭帧,连接关闭后再进行读写将会产生错误

WebSocket并不总是被正常关闭,连接超时等情况均能导致异常关闭,若不正确处理便会引起异常并导致程序退出。使用WebSocket的难点就在于维护连接稳定。

1. WebSocket 的基本使用

建立连接

from fastapi import FastAPI, WebSocket
app = FastAPI()
# 在客户端向/sample发送握手请求时异步调用
@app.websocket("/sample")
async def sample(ws: WebSocket):
    try:
        await ws.accept() # 服务器响应握手请求,建立WebSocket连接
    # ...

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=PORT)
func initws(ctx context.Context,port int) (*websocket.Conn, error) {
	conn, _, err := websocket.Dial(ctx, "ws://127.0.0.1:"+strconv.Itoa(port)+"/sample", nil) // 客户端发送握手请求
	if err != nil {
		return nil, err
	}
	log.Printf("ASR websocket connected")
	return conn, nil
}

连接开放

@app.websocket("/sample")
async def sample(ws: WebSocket):
    try:
        await ws.accept()

        while True: # 消息循环
            msg = await ws.receive()
            # fastapi中,receive()方法获得一个字典,包含"type" "text"或"byte"等key
            if msg.get("type") == "disconnect": # 接收到关闭帧,退出消息循环
                break
            data = msg.get("bytes")
            if data is None:
                continue
            # 接收到二进制数据,以下是业务逻辑。
            raw_buffer.extend(data)
            usable_len = len(raw_buffer) - (len(raw_buffer) % frame_bytes)
            if usable_len == 0:
                continue

            pcm = np.frombuffer(raw_buffer[:usable_len], dtype=np.int16)
            del raw_buffer[:usable_len]

            samples = pcm.astype(np.float32) / 32768.0
            audio_buffer = np.concatenate((audio_buffer, samples))

            cut_size = config.CHUNK_SIZE * frame_bytes
            while len(audio_buffer) >= cut_size:
                chunk = audio_buffer[:cut_size]
                audio_buffer = audio_buffer[cut_size:]

                for speech_dict, speech_samples in vad_iterator(chunk):
                    if "start" in speech_dict:
                        sensevoice_model.reset()
                        last_text = ""
                        has_text = False

                    is_last = "end" in speech_dict
                    for res in sensevoice_model.streaming_inference(
                        speech_samples, is_last
                    ):
                        text = res.get("text", "")
                        if not text:
                            continue
                        has_text = True
                        last_text = text
                        await ws.send_json({"text": text, "is_final": False})
                        # 异步发送ASR结果
                    if is_last:
                        if has_text:
                            await ws.send_json(
                                {"text": last_text, "is_final": True}
                            )
                        has_text = False
                        last_text = ""
    # ... 异常处理
// 流式发送音频数据
go func() {
	for {
		select {
		case <-asrService.ctx.Done():
			return
		case audioData, ok := <-asrService.AudioCh:
			if !ok {
				return
			}
			if len(audioData) == 0 {
				continue
			}
			wctx, cancel := context.WithTimeout(asrService.ctx, 2*time.Second)
			err := asrService.conn.Write(wctx, websocket.MessageBinary, audioData)
			cancel()
			if err != nil {
				log.Printf("ASR WebSocket write err: %v", err)
			}
		}
	}
}
// goroutine实时接收ASR结果
go func() {
	var reconnectDelay time.Duration
	for {
		select {
		case <-asrService.ctx.Done():
			return
		default:
		}
		wctx, cancel := context.WithTimeout(asrService.ctx, 2*time.Second)
		mt, msg, err := asrService.conn.Read(wctx) // Read()方法读取一个数据帧
		cancel()
		if err != nil {
			log.Printf("ASR WebSocket read err: %v", err)
                        return
                }
		if mt == websocket.MessageText {
			var res asrResult
			err := json.Unmarshal(msg, &res)
			if err != nil {
				log.Printf("ASR result unmarshal err: %v", err)
				continue
			}
			select {
				case asrService.ResultCh <- res:
				default:
			}
		}
	}

连接关闭

if msg.get("type") == "disconnect": # 接收到关闭帧,退出消息循环
    break

err = asrService.conn.Close(websocket.StatusNormalClosure, "closed") // 参数为状态码和原因字符串,发送关闭帧

2. 维护WebSocket连接

连接关闭后,再进行读写便会引发exception和error,所以创建WebSocket连接时就应确保服务器进程已完全启动,并在程序运行时持续维护WebSocket连接。

通过监控python端日志确保服务启动

// goroutine转发python端输出,并监控特定日志,通过设置标志位告知服务启动情况
go func() {
	scanner := bufio.NewScanner(errPipe)
	hasSetServerReady := false
	for scanner.Scan() {
		if !hasSetServerReady && strings.Contains(scanner.Text(), fmt.Sprintf("Uvicorn running on http://127.0.0.1:%d", port)) {
			setServerReady("asr")
			hasSetServerReady = true
		}
		fmt.Println("[python][ASR][stderr]", scanner.Text())
	}
}()

心跳机制

python端使用的fastapi库和Go端使用的”github.com/coder/websocket”接收ping帧时均会自动响应并发送pong帧,故只需在Go端定时调用Ping方法。Ping方法内部会处理pong帧,Ping方法正确返回便表示已收到对应 Pong。但需保证此时有goroutine持续调用 Read(),否则控制帧无法被读取处理。

func startHeartbeat(conn *websocket.Conn, interval time.Duration, done chan struct{}) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            err := conn.Ping(ctx)
            cancel()

            if err != nil {
                log.Println("heartbeat failed:", err)
                close(done)
                return
            }

        case <-done:
            return
        }
    }
}
go startHeartbeat(conn, 5*time.Second, done)

重连机制

WebSocket连接可能因为各种原因意外断开,导致心跳协程退出/Go端读写方法返回err != nil/python端抛出exception,程序应进行处理以尽快恢复服务。

python端做好异常处理,清理资源后退出异步函数即可。

但在Go端实现重连机制时出现了问题:多个goroutine均在使用一个websocket进行读写,若连接断开,多个goroutine都会出错并调用重连函数,导致反复断开、重连。因此我正在尝试将WebSocket连接管理逻辑解耦,由单一控制点维护连接,这是AI给出的示例。

func (m *Manager) connectLoop() {
	backoff := time.Second

	for {
		select {
		case <-m.ctx.Done():
			return
		default:
		}

		log.Println("connecting to", m.url)

		conn, _, err := websocket.Dial(m.ctx, m.url, nil)
		if err != nil {
			log.Println("dial failed:", err)
			time.Sleep(backoff)
			backoff = min(backoff*2, 30*time.Second)
			continue
		}

		log.Println("connected")

		m.setConn(conn)
		backoff = time.Second

		errCh := make(chan error, 2)

                go func() {
                    errCh <- m.readLoop(conn)
                }()

                go func() {
                    errCh <- m.heartbeatLoop(conn)
                }()

                err := <-errCh
                log.Println("connection lost:", err)

		m.clearConn()
		conn.Close(websocket.StatusInternalError, "reconnect")
	}
}

TODO

文章标题:WebSocket与流式网络通信
作者:Misaka10233
本文链接:https://www.misaka10233.com/2026/03/04/websocket%e4%b8%8e%e6%b5%81%e5%bc%8f%e7%bd%91%e7%bb%9c%e9%80%9a%e4%bf%a1/

评论

  1. 渚花汐拾
    2 月前
    2026-3-04 23:10:50

    2/100了o(////▽////)q

    • 博主
      渚花汐拾
      2 月前
      2026-3-05 8:57:16

      ??

      • 渚花汐拾
        Misaka10233
        4 周前
        2026-3-22 9:44:46

        哦内盖…٩(◕‿◕。)۶ (☆ω☆)

  2. 林佑嘉
    4 周前
    2026-3-22 23:57:44

    可以手把手教我吗

    • 博主
      林佑嘉
      1 周前
      2026-4-11 16:18:39

      🥺🥺

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇