引入项目
import (
amqp "github.com/rabbitmq/amqp091-go"
)
建立连接&获取通道
消费者与生产者建立连接、获取通道和声明队列的流程相同。
// 读取环境变量
user := os.Getenv("RABBITMQ_USER")
password := os.Getenv("RABBITMQ_PASSWORD")
host := os.Getenv("RABBITMQ_HOST")
port := os.Getenv("RABBITMQ_PORT")
url := "amqp://" + user + ":" + password + "@" + host + ":" + port + "/"
// 连接到消息队列
conn, err := amqp.Dial(url)
if err != nil {
log.Fatalln("connect to message queue failed: ", err)
}
defer conn.Close()
// 获取通道
ch, err := conn.Channel()
if err != nil {
log.Fatalln("create channel failed: ", err)
}
defer ch.Close()
使用 func (ch *amqp.Channel) Qos(prefetchCount int, prefetchSize int, global bool) 控制通道流量
prefetchCount单次获取未确认信息的最大数量,为0则无限制prefetchSize单次获取未确认信息的最大大小(B),为0则无限制,通常不通过该参数控制流量global为true时Qos限制整条通道,为false时限制每个消耗者(即每个消耗者单次最多获取prefetchCount条消息)
声明队列
使用同一队列的消费者、生产者若要分别声明队列,则使用的参数必须完全一致。若声明时同名队列已存在而参数不同,则RabbitMQ会报错并关闭通道。
// 函数签名
func (ch *amqp.Channel) QueueDeclare(
name string,
durable bool,
autoDelete bool,
exclusive bool,
noWait bool,
args amqp.Table
) (amqp.Queue, error)
name队列名称:可以为空,此时RabbitMQ会自动创建一个临时唯一变量名,通过q.name获取durable是否持久化:服务器关闭时,持久化队列会保存队列中的消息,并在重启时恢复autoDelete是否自动删除:若允许自动删除,则当该队列的最后一个消费者消失时,服务器删除该队列exclusive是否独占队列:若独占队列,则只有当前连接能操作它,当前连接断开时队列删除noWait是否无需等待RabbitMQ服务器返回确认:异步模式,函数立刻返回,若出现错误则由RabbitMQ异步通知args扩展参数:形如amqp.Table{ "x-dead-letter-exchange": "dlx_exchange", }
常用扩展参数:
"x-dead-letter-exchange" : string死信交换机名称"x-dead-letter-routing-key" : string指定发送死信使用的路由键"x-message-ttl" : int32消息自动过期时限(ms),若有配置则进入死信队列"x-max-length" : int32队列最大长度"x-overflow" : string队列满时的策略"drop-head"丢弃最旧"reject-publish"拒绝新消息
"x-queue-type" : string队列类型"classic"经典队列,可选持久化和自动删除,标准FIFO"quorum"仲裁队列,高可靠性的分布式持久消息队列,性能略低,适合生产环境。
其设计目标就是长久存在,必须设置durable = true、autoDelete = false,通常令exclusive = false"stream"流式队列,适用于大规模分发、消息回溯、高吞吐性能、大日志等场景,不适合用于任务队列,不详细列出用法
"x-single-active-consumer" : bool单活消费者,即使有多个消费者,也确保同一时间只有一个消费者在消费,从而保证并发下的消息顺序"x-max-priority" : int32指定队列中消息的最大优先级,后续发送消息时可指定消息优先级,使消息按按优先级高低被消费"x-expires" : int32队列自动过期时间(ms),队列空闲指定时间后会被删除"x-delivery-limit" : int32消费最大重试次数,querum队列专用,消费失败超过该次数则进入死信队列
注意事项:
- 声明队列时自动绑定到direct类型的默认交换机””,在direct交换机中路由键就是队列名。因此后续把队列名设为路由键,向””交换机发送消息,就能直接把消息投递到这个队列。
声明交换机
声明交换机时,如果该交换机不存在就创建,存在就校验参数是否一致,若不一致则关闭通道。
// 函数签名
func (ch *amqp.Channel) ExchangeDeclare(
name string,
kind string,
durable bool,
autoDelete bool,
internal bool,
noWait bool,
args amqp.Table
) error
name交换机名称:交换机的唯一标识kind交换机类型:"direct"精确匹配路由键"fanout"广播消息,不看路由键,所有绑定到该交换机的队列都会收到该消息"topic"模式匹配路由键,*匹配一个单词,#匹配零个或多个单词
durable交换机是否持久化:持久化交换机在RabbitMQ重启后仍然存在autoDelete是否自动删除:当没有任何队列绑定到该交换机,是否自动删除internal是否为内部交换机:内部交换机不接受客户端的publish,只能接收其他交换机路由放入的消息nowait是否不等待服务器确认args扩展参数
常用扩展参数:
"alternate-exchange" : string指定备用交换机,生产者放入消息时,若交换机找不到匹配的队列就将消息转发到备用交换机
绑定队列与交换机
创建一条路由规则,当消息发送到指定交换机且路由键匹配绑定键时,将消息投递到指定队列。
// 函数签名
QueueBind(
name string,
key string,
exchange string,
noWait bool,
args amqp.Table
) error
- name 队列名:需事先声明
- key 绑定键:用于匹配路由键,匹配规则依绑定交换机类型而定
- exchange 交换机名:指定绑定到的交换机
- noWait 是否等待 RabbitMQ 返回确认
- args 拓展参数
绑定交换机与交换机
将一个交换机的输出接到另一个交换机的输入。
// 函数签名
func (ch *amqp.Channel) ExchangeBind(
destination string,
key string,
source string,
noWait bool,
args amqp.Table
) error
destination目标交换机名称:接收消息的交换机key路由键:随消息输入目标交换机的路由键source源交换机:发出消息的交换机noWait是否等待 RabbitMQ 返回确认args扩展参数
发送消息
// 函数签名
func (ch *amqp.Channel) Publish(
exchange string,
key string,
mandatory bool,
immediate bool,
msg amqp.Publishing
) error
exchange目标交换机名称key路由键mandatory消息没有队列接收时,是否返回给生产者:false直接丢弃
若为true,通过ch.NotifyReturn(...)接收返回的消息immediate没有消费者立即消费时是否返回消息:已废弃的功能,总是选falsemsg消息:包含具体内容和元数据,形如amqp.Publishing{
ContentType: "text/plain",
Body: []byte("hello"),
}
amqp.Publishing 的常用字段
Body : []byte数据本体DeliveryMode : uint8/enum是否持久化amqp.Persistent或2持久化amqp.Transient或1非持久化
ContentType : string类似http的Content-Type标头,例如"text/plain"、"application/json"Headers : amqp.Table自定义元数据CorrelationId : string用于追踪请求/响应数据链路的唯一标识ReplyTo : string告知接收者回复消息的位置
Tipfunc (ch *amqp.Channel) PublishWithContext(_ context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing) error 的文档中写明:NOTE: this function is equivalent to Channel.Publish. Context is not honoured. 目前不能通过context.Context控制函数执行流程。
接收消息
// 函数签名
func (ch *amqp.Channel) Consume(
queue string,
consumer string,
autoAck bool,
exclusive bool,
noLocal bool,
noWait bool,
args amqp.Table
) (<-chan amqp.Delivery, error)
- queue 要消费的队列名:队列须先声明
- consumer 消费者标识:用于唯一标识该消费者,为空则由RabbitMQ自动生成
- autoAck 是否自动确认消息:推荐设为
false,手动调用func (d amqp.Delivery) Ack(multiple bool) error确认处理成功;Nack(multiple bool, requeue bool) error通知处理失败 - exclusive 是否独占消费:为
true时目标队列只能被当前消费者消费 - noLocal 未被实现,总是设为false
- noWait 是否不等待服务器确认
- args 扩展参数
- 返回值:<-chan amqp.Delivery 返回只读channel,可通过
for msg := range msgs读取
常用扩展参数:
"x-priority" : int32消费者优先级,多个消费者同时消费时,优先级高的先拿取消息
amqp.Delivery 的常用字段
Body : []byteContentType : stringHeaders : amqp.TableDeliveryMode : uint8Priority : uint8CorrelationId : stringReplyTo : stringMessageId : string消息唯一IDTimestamp : time发送时间Exchange : string来源交换机RoutingKey : string发送时使用的路由键Redelivered : bool该消息是不是重新投递的DeliveryTag : uint64供Ack/Nack内部使用的Tag
告知处理结果:
// 函数签名
func (d amqp.Delivery) Ack(multiple bool) error
multiple是否批量确认:若为true则确认所有DeliveryTag <= 当前msg.DeliveryTag的消息;否则只确认当前消息
// 函数签名
func (d amqp.Delivery) Nack(multiple bool, requeue bool) error
multiple是否批量拒绝requeue是否将消息放回队列,不放回则会丢弃或放入死信队列



🥺