RabbitMQ in Go

引入项目

import (
    amqp "github.com/rabbitmq/amqp091-go"
)

建立连接&获取通道

消费者与生产者建立连接、获取通道和声明队列的流程相同。

	// 读取环境变量
	user := os.Getenv("RABBITMQ_USER")
	password := os.Getenv("RABBITMQ_PASSWORD")
	host := os.Getenv("RABBITMQ_HOST")
	port := os.Getenv("RABBITMQ_PORT")
	url := "amqp://" + user + ":" + password + "@" + host + ":" + port + "/"
	// 连接到消息队列
	conn, err := amqp.Dial(url)
	if err != nil {
		log.Fatalln("connect to message queue failed: ", err)
	}
	defer conn.Close()

	// 获取通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalln("create channel failed: ", err)
	}
	defer ch.Close()

使用 func (ch *amqp.Channel) Qos(prefetchCount int, prefetchSize int, global bool) 控制通道流量

  • prefetchCount 单次获取未确认信息的最大数量,为0则无限制
  • prefetchSize 单次获取未确认信息的最大大小(B),为0则无限制,通常不通过该参数控制流量
  • globaltrue时Qos限制整条通道,为false时限制每个消耗者(即每个消耗者单次最多获取prefetchCount条消息)

声明队列

使用同一队列的消费者、生产者若要分别声明队列,则使用的参数必须完全一致。若声明时同名队列已存在而参数不同,则RabbitMQ会报错并关闭通道。

// 函数签名
func (ch *amqp.Channel) QueueDeclare(
    name string, 
    durable bool, 
    autoDelete bool, 
    exclusive bool, 
    noWait bool, 
    args amqp.Table
) (amqp.Queue, error)
  • name 队列名称:可以为空,此时RabbitMQ会自动创建一个临时唯一变量名,通过 q.name 获取
  • durable 是否持久化:服务器关闭时,持久化队列会保存队列中的消息,并在重启时恢复
  • autoDelete 是否自动删除:若允许自动删除,则当该队列的最后一个消费者消失时,服务器删除该队列
  • exclusive 是否独占队列:若独占队列,则只有当前连接能操作它,当前连接断开时队列删除
  • noWait 是否无需等待RabbitMQ服务器返回确认:异步模式,函数立刻返回,若出现错误则由RabbitMQ异步通知
  • args 扩展参数:形如 amqp.Table{ "x-dead-letter-exchange": "dlx_exchange", }

常用扩展参数:

  • "x-dead-letter-exchange" : string 死信交换机名称
  • "x-dead-letter-routing-key" : string 指定发送死信使用的路由键
  • "x-message-ttl" : int32 消息自动过期时限(ms),若有配置则进入死信队列
  • "x-max-length" : int32 队列最大长度
  • "x-overflow" : string 队列满时的策略
    • "drop-head" 丢弃最旧
    • "reject-publish" 拒绝新消息
  • "x-queue-type" : string 队列类型
    • "classic" 经典队列,可选持久化和自动删除,标准FIFO
    • "quorum" 仲裁队列,高可靠性的分布式持久消息队列,性能略低,适合生产环境。
      其设计目标就是长久存在,必须设置durable = trueautoDelete = false ,通常令 exclusive = false
    • "stream" 流式队列,适用于大规模分发、消息回溯、高吞吐性能、大日志等场景,不适合用于任务队列,不详细列出用法
  • "x-single-active-consumer" : bool 单活消费者,即使有多个消费者,也确保同一时间只有一个消费者在消费,从而保证并发下的消息顺序
  • "x-max-priority" : int32 指定队列中消息的最大优先级,后续发送消息时可指定消息优先级,使消息按按优先级高低被消费
  • "x-expires" : int32 队列自动过期时间(ms),队列空闲指定时间后会被删除
  • "x-delivery-limit" : int32 消费最大重试次数,querum队列专用,消费失败超过该次数则进入死信队列

注意事项:

  • 声明队列时自动绑定到direct类型的默认交换机””,在direct交换机中路由键就是队列名。因此后续把队列名设为路由键,向””交换机发送消息,就能直接把消息投递到这个队列。

声明交换机

声明交换机时,如果该交换机不存在就创建,存在就校验参数是否一致,若不一致则关闭通道。

// 函数签名
func (ch *amqp.Channel) ExchangeDeclare(
    name string, 
    kind string, 
    durable bool, 
    autoDelete bool, 
    internal bool,
    noWait bool, 
    args amqp.Table
) error
  • name 交换机名称:交换机的唯一标识
  • kind 交换机类型:
    • "direct" 精确匹配路由键
    • "fanout" 广播消息,不看路由键,所有绑定到该交换机的队列都会收到该消息
    • "topic" 模式匹配路由键,*匹配一个单词,#匹配零个或多个单词
  • durable 交换机是否持久化:持久化交换机在RabbitMQ重启后仍然存在
  • autoDelete 是否自动删除:当没有任何队列绑定到该交换机,是否自动删除
  • internal 是否为内部交换机:内部交换机不接受客户端的publish,只能接收其他交换机路由放入的消息
  • nowait 是否不等待服务器确认
  • args 扩展参数

常用扩展参数:

  • "alternate-exchange" : string 指定备用交换机,生产者放入消息时,若交换机找不到匹配的队列就将消息转发到备用交换机

绑定队列与交换机

创建一条路由规则,当消息发送到指定交换机且路由键匹配绑定键时,将消息投递到指定队列。

// 函数签名
QueueBind(
    name string, 
    key string, 
    exchange string, 
    noWait bool, 
    args amqp.Table
) error
  • name 队列名:需事先声明
  • key 绑定键:用于匹配路由键,匹配规则依绑定交换机类型而定
  • exchange 交换机名:指定绑定到的交换机
  • noWait 是否等待 RabbitMQ 返回确认
  • args 拓展参数

绑定交换机与交换机

将一个交换机的输出接到另一个交换机的输入。

// 函数签名
func (ch *amqp.Channel) ExchangeBind(
    destination string, 
    key string, 
    source string, 
    noWait bool, 
    args amqp.Table
) error
  • destination 目标交换机名称:接收消息的交换机
  • key 路由键:随消息输入目标交换机的路由键
  • source 源交换机:发出消息的交换机
  • noWait 是否等待 RabbitMQ 返回确认
  • args 扩展参数

发送消息

// 函数签名
func (ch *amqp.Channel) Publish(
    exchange string, 
    key string, 
    mandatory bool, 
    immediate bool, 
    msg amqp.Publishing
) error
  • exchange 目标交换机名称
  • key 路由键
  • mandatory 消息没有队列接收时,是否返回给生产者:false直接丢弃
    若为true,通过 ch.NotifyReturn(...) 接收返回的消息
  • immediate 没有消费者立即消费时是否返回消息:已废弃的功能,总是选false
  • msg 消息:包含具体内容和元数据,形如
    amqp.Publishing{
    ContentType: "text/plain",
    Body: []byte("hello"),
    }

amqp.Publishing 的常用字段

  • Body : []byte 数据本体
  • DeliveryMode : uint8/enum 是否持久化
    • amqp.Persistent2 持久化
    • amqp.Transient1 非持久化
  • ContentType : string 类似http的Content-Type标头,例如"text/plain"
    "application/json"
  • Headers : amqp.Table 自定义元数据
  • CorrelationId : string 用于追踪请求/响应数据链路的唯一标识
  • ReplyTo : string 告知接收者回复消息的位置

Tip
func (ch *amqp.Channel) PublishWithContext(_ context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error 的文档中写明:NOTE: this function is equivalent to Channel.Publish. Context is not honoured. 目前不能通过context.Context控制函数执行流程。

接收消息

// 函数签名
func (ch *amqp.Channel) Consume(
    queue string, 
    consumer string, 
    autoAck bool, 
    exclusive bool, 
    noLocal bool, 
    noWait bool, 
    args amqp.Table
) (<-chan amqp.Delivery, error)
  • queue 要消费的队列名:队列须先声明
  • consumer 消费者标识:用于唯一标识该消费者,为空则由RabbitMQ自动生成
  • autoAck 是否自动确认消息:推荐设为false,手动调用func (d amqp.Delivery) Ack(multiple bool) error 确认处理成功;Nack(multiple bool, requeue bool) error 通知处理失败
  • exclusive 是否独占消费:为true时目标队列只能被当前消费者消费
  • noLocal 未被实现,总是设为false
  • noWait 是否不等待服务器确认
  • args 扩展参数
  • 返回值:<-chan amqp.Delivery 返回只读channel,可通过for msg := range msgs读取

常用扩展参数:

  • "x-priority" : int32 消费者优先级,多个消费者同时消费时,优先级高的先拿取消息

amqp.Delivery 的常用字段

  • Body : []byte
  • ContentType : string
  • Headers : amqp.Table
  • DeliveryMode : uint8
  • Priority : uint8
  • CorrelationId : string
  • ReplyTo : string
  • MessageId : string 消息唯一ID
  • Timestamp : time 发送时间
  • Exchange : string 来源交换机
  • RoutingKey : string 发送时使用的路由键
  • Redelivered : bool 该消息是不是重新投递的
  • DeliveryTag : uint64Ack / Nack 内部使用的Tag

告知处理结果:

// 函数签名
func (d amqp.Delivery) Ack(multiple bool) error
  • multiple 是否批量确认:若为 true 则确认所有DeliveryTag <= 当前msg.DeliveryTag的消息;否则只确认当前消息
// 函数签名
func (d amqp.Delivery) Nack(multiple bool, requeue bool) error
  • multiple 是否批量拒绝
  • requeue 是否将消息放回队列,不放回则会丢弃或放入死信队列
文章标题:RabbitMQ in Go
作者:Misaka10233
本文链接:https://www.misaka10233.com/2026/04/11/rabbitmq-in-go/

评论

  1. 林佑嘉
    2 天前
    2026-4-17 19:19:23

    🥺

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇