Skip to main content

Docker 部署 RabbitMQ

快速启动

# 基本运行(带管理界面)
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management

# 带持久化
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=your_password \
-v rabbitmq_data:/var/lib/rabbitmq \
rabbitmq:3-management

# 访问管理界面: http://localhost:15672
# 默认用户名/密码: guest/guest (仅限 localhost)

Docker Compose 部署

version: '3.8'

services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
restart: always
ports:
- "5672:5672" # AMQP 端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: your_password
TZ: Asia/Shanghai
volumes:
- rabbitmq_data:/var/lib/rabbitmq

volumes:
rabbitmq_data:

自定义配置

rabbitmq.conf

# 网络配置
listeners.tcp.default = 5672
management.tcp.port = 15672

# 内存和磁盘限制
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
disk_free_limit.relative = 2.0

# 心跳超时
heartbeat = 60

# 日志配置
log.file.level = info
log.console = true
log.console.level = info

# 持久化配置
queue_master_locator = min-masters

# 最大连接数
connection_max = 65536

# 消息大小限制
max_message_size = 134217728

# 默认用户权限
default_vhost = /
default_user = admin
default_pass = your_password
default_user_tags.administrator = true
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*

enabled_plugins

[rabbitmq_management,
rabbitmq_management_agent,
rabbitmq_prometheus,
rabbitmq_shovel,
rabbitmq_shovel_management,
rabbitmq_federation,
rabbitmq_federation_management,
rabbitmq_delayed_message_exchange].

常用命令

# 查看状态
docker exec rabbitmq rabbitmqctl status

# 查看集群状态
docker exec rabbitmq rabbitmqctl cluster_status

# 查看队列列表
docker exec rabbitmq rabbitmqctl list_queues

# 查看详细队列信息
docker exec rabbitmq rabbitmqctl list_queues name messages consumers

# 查看交换机列表
docker exec rabbitmq rabbitmqctl list_exchanges

# 查看绑定关系
docker exec rabbitmq rabbitmqctl list_bindings

# 查看连接
docker exec rabbitmq rabbitmqctl list_connections

# 查看通道
docker exec rabbitmq rabbitmqctl list_channels

# 查看用户
docker exec rabbitmq rabbitmqctl list_users

# 查看虚拟主机
docker exec rabbitmq rabbitmqctl list_vhosts

# 查看权限
docker exec rabbitmq rabbitmqctl list_permissions

用户和权限管理

# 创建用户
docker exec rabbitmq rabbitmqctl add_user myuser mypassword

# 设置用户标签(管理员)
docker exec rabbitmq rabbitmqctl set_user_tags myuser administrator

# 设置用户权限
docker exec rabbitmq rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"

# 删除用户
docker exec rabbitmq rabbitmqctl delete_user myuser

# 修改密码
docker exec rabbitmq rabbitmqctl change_password myuser newpassword

# 创建虚拟主机
docker exec rabbitmq rabbitmqctl add_vhost my_vhost

# 删除虚拟主机
docker exec rabbitmq rabbitmqctl delete_vhost my_vhost

# 授予用户对虚拟主机的权限
docker exec rabbitmq rabbitmqctl set_permissions -p my_vhost myuser ".*" ".*" ".*"

队列管理

# 声明队列
docker exec rabbitmq rabbitmqctl declare queue name=my_queue durable=true

# 删除队列
docker exec rabbitmq rabbitmqctl delete_queue my_queue

# 清空队列
docker exec rabbitmq rabbitmqctl purge_queue my_queue

# 查看队列详情
docker exec rabbitmq rabbitmqctl list_queues name messages_ready messages_unacknowledged messages

# 查看死信队列
docker exec rabbitmq rabbitmqctl list_queues name messages x-dead-letter-exchange x-dead-letter-routing-key

核心概念示例

1. 简单队列(Simple Queue)

// C# 生产者
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
// C# 消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received: {message}");
};

channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);

2. 工作队列(Work Queue)

// 生产者 - 发送任务
channel.QueueDeclare(queue: "task_queue",
durable: true, // 持久化队列
exclusive: false,
autoDelete: false,
arguments: null);

var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 消息持久化

for (int i = 0; i < 10; i++)
{
string message = $"Task {i}";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
}
// 消费者 - 处理任务
channel.QueueDeclare(queue: "task_queue", durable: true);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 公平分发

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);

Console.WriteLine($"Processing: {message}");
Thread.Sleep(1000); // 模拟处理任务

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: "task_queue",
autoAck: false, // 手动确认
consumer: consumer);

3. 发布/订阅(Publish/Subscribe)

// 生产者 - 发布消息到 fanout 交换机
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);

string message = "Log message";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
// 消费者 - 订阅消息
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);

var queueName = channel.QueueDeclare().QueueName; // 临时队列
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received: {message}");
};

channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);

4. 路由(Routing)

// 生产者 - 使用 direct 交换机
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);

string severity = "error"; // info, warning, error
string message = "An error occurred";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
// 消费者 - 订阅特定路由键
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);

var queueName = channel.QueueDeclare().QueueName;

// 订阅多个路由键
var severities = new[] { "error", "warning" };
foreach (var severity in severities)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($"[{routingKey}] {message}");
};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

5. 主题(Topic)

// 生产者 - 使用 topic 交换机
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

string routingKey = "user.order.created"; // 格式: <entity>.<action>.<status>
string message = "New order created";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "topic_logs",
routingKey: routingKey,
basicProperties: null,
body: body);
// 消费者 - 使用通配符订阅
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

// * 匹配一个单词
// # 匹配零个或多个单词
var bindingKeys = new[] { "user.order.*", "user.#" };
foreach (var bindingKey in bindingKeys)
{
channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: bindingKey);
}

高级特性

1. 死信队列(DLX)

// 创建死信交换机和队列
channel.ExchangeDeclare(exchange: "dlx_exchange", type: ExchangeType.Direct);
channel.QueueDeclare(queue: "dlx_queue", durable: true);
channel.QueueBind(queue: "dlx_queue", exchange: "dlx_exchange", routingKey: "dlx");

// 创建业务队列,配置死信交换机
var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx_exchange" },
{ "x-dead-letter-routing-key", "dlx" },
{ "x-message-ttl", 10000 } // 10秒过期
};

channel.QueueDeclare(queue: "business_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);

2. 延迟队列

// 使用 TTL + 死信队列实现延迟队列
var delayArguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "delayed_exchange" },
{ "x-dead-letter-routing-key", "delayed" },
{ "x-message-ttl", 30000 } // 30秒延迟
};

channel.QueueDeclare(queue: "delay_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: delayArguments);

// 目标队列
channel.ExchangeDeclare(exchange: "delayed_exchange", type: ExchangeType.Direct);
channel.QueueDeclare(queue: "target_queue", durable: true);
channel.QueueBind(queue: "target_queue", exchange: "delayed_exchange", routingKey: "delayed");

// 发送延迟消息
var body = Encoding.UTF8.GetBytes("Delayed message");
channel.BasicPublish(exchange: "",
routingKey: "delay_queue",
basicProperties: null,
body: body);

3. 优先级队列

// 创建优先级队列
var arguments = new Dictionary<string, object>
{
{ "x-max-priority", 10 } // 最大优先级
};

channel.QueueDeclare(queue: "priority_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);

// 发送带优先级的消息
var properties = channel.CreateBasicProperties();
properties.Priority = 5; // 0-10

var body = Encoding.UTF8.GetBytes("Priority message");
channel.BasicPublish(exchange: "",
routingKey: "priority_queue",
basicProperties: properties,
body: body);

4. 消息确认机制

// 生产者确认
channel.ConfirmSelect();

channel.BasicPublish(exchange: "", routingKey: "test", body: body);

if (channel.WaitForConfirms())
{
Console.WriteLine("Message confirmed");
}
else
{
Console.WriteLine("Message nacked");
}

// 异步确认
channel.BasicAcks += (sender, ea) =>
{
Console.WriteLine($"Message {ea.DeliveryTag} confirmed");
};

channel.BasicNacks += (sender, ea) =>
{
Console.WriteLine($"Message {ea.DeliveryTag} nacked");
};

性能监控

# 查看节点内存使用
docker exec rabbitmq rabbitmqctl status | grep -A 3 memory

# 查看队列内存使用
docker exec rabbitmq rabbitmqctl list_queues name memory

# 查看连接数
docker exec rabbitmq rabbitmqctl list_connections name user peer_host peer_port

# 查看消费者数
docker exec rabbitmq rabbitmqctl list_consumers

# 查看未确认消息
docker exec rabbitmq rabbitmqctl list_queues name messages_unacknowledged

# 查看消息发布/消费速率
docker exec rabbitmq rabbitmqctl list_queues name message_stats

性能优化

1. 预取数量优化

// 每次预取 1 条消息(公平分发)
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

// 高吞吐量场景,增加预取数量
channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);

2. 批量操作

// 批量发送
for (int i = 0; i < 1000; i++)
{
channel.BasicPublish(exchange: "", routingKey: "queue", body: body);
}

// 批量确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

3. 连接池

// 使用单例 Connection,多个 Channel
public class RabbitMQService
{
private static readonly Lazy<IConnection> _connection = new(() =>
{
var factory = new ConnectionFactory() { HostName = "localhost" };
return factory.CreateConnection();
});

public IModel CreateChannel()
{
return _connection.Value.CreateModel();
}
}

最佳实践

最佳实践

可靠性

  • ✅ 队列和消息持久化
  • ✅ 消费者手动确认(autoAck = false)
  • ✅ 生产者确认机制
  • ✅ 配置死信队列处理失败消息
  • ✅ 设置合理的 TTL
  • ✅ 使用集群部署

性能

  • ✅ 合理设置预取数量(prefetchCount)
  • ✅ 使用批量操作
  • ✅ 复用 Connection,多 Channel
  • ✅ 避免在消费者中执行耗时操作
  • ✅ 使用消息压缩
  • ✅ 监控队列积压

安全

  • ✅ 修改默认用户密码
  • ✅ 使用虚拟主机隔离
  • ✅ 最小权限原则
  • ✅ 启用 TLS/SSL
  • ✅ 限制网络访问
  • ✅ 定期备份配置

开发

  • ✅ 合理设计交换机和队列
  • ✅ 使用有意义的命名
  • ✅ 记录消息处理日志
  • ✅ 实现幂等性
  • ✅ 处理消息重试
  • ✅ 监控和告警

常见问题

1. 消息丢失

// 解决方案:开启持久化 + 确认机制
channel.QueueDeclare(queue: "durable_queue", durable: true);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.ConfirmSelect();
channel.BasicPublish(exchange: "", routingKey: "durable_queue",
basicProperties: properties, body: body);
channel.WaitForConfirms();

2. 消息积压

# 增加消费者数量
# 增加预取数量
# 检查消费者处理性能

# 查看积压情况
docker exec rabbitmq rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

3. 内存告警

# 清空队列
docker exec rabbitmq rabbitmqctl purge_queue queue_name

# 调整内存水位
docker exec rabbitmq rabbitmqctl set_vm_memory_high_watermark 0.7

相关资源