Docker 部署 RabbitMQ
快速启动
# 基本运行(带管理界面)
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# 带持久化
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=your_password \
-v rabbitmq_data:/var/lib/rabbitmq \
rabbitmq:3-management
# 访问管理界面: http://localhost:15672
# 默认用户名/密码: guest/guest (仅限 localhost)
Docker Compose 部署
- 基础配置
- 生产配置
- 集群配置
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
restart: always
ports:
- "5672:5672" # AMQP 端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: your_password
TZ: Asia/Shanghai
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
restart: always
hostname: rabbitmq
ports:
- "5672:5672" # AMQP 端口
- "15672:15672" # 管理界面端口
- "15692:15692" # Prometheus 监控端口
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-admin}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASSWORD}
RABBITMQ_DEFAULT_VHOST: /
TZ: Asia/Shanghai
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- ./enabled_plugins:/etc/rabbitmq/enabled_plugins
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 10s
retries: 5
start_period: 40s
volumes:
rabbitmq_data:
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3-management
container_name: rabbitmq1
hostname: rabbitmq1
restart: always
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie_string'
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: your_password
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq1_data:/var/lib/rabbitmq
rabbitmq2:
image: rabbitmq:3-management
container_name: rabbitmq2
hostname: rabbitmq2
restart: always
ports:
- "5673:5672"
- "15673:15672"
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie_string'
volumes:
- rabbitmq2_data:/var/lib/rabbitmq
depends_on:
- rabbitmq1
rabbitmq3:
image: rabbitmq:3-management
container_name: rabbitmq3
hostname: rabbitmq3
restart: always
ports:
- "5674:5672"
- "15674:15672"
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie_string'
volumes:
- rabbitmq3_data:/var/lib/rabbitmq
depends_on:
- rabbitmq1
volumes:
rabbitmq1_data:
rabbitmq2_data:
rabbitmq3_data:
集群初始化:
# 将节点 2 加入集群
docker exec rabbitmq2 rabbitmqctl stop_app
docker exec rabbitmq2 rabbitmqctl reset
docker exec rabbitmq2 rabbitmqctl join_cluster rabbit@rabbitmq1
docker exec rabbitmq2 rabbitmqctl start_app
# 将节点 3 加入集群
docker exec rabbitmq3 rabbitmqctl stop_app
docker exec rabbitmq3 rabbitmqctl reset
docker exec rabbitmq3 rabbitmqctl join_cluster rabbit@rabbitmq1
docker exec rabbitmq3 rabbitmqctl start_app
# 查看集群状态
docker exec rabbitmq1 rabbitmqctl cluster_status
自定义配置
rabbitmq.conf
# 网络配置
listeners.tcp.default = 5672
management.tcp.port = 15672
# 内存和磁盘限制
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
disk_free_limit.relative = 2.0
# 心跳超时
heartbeat = 60
# 日志配置
log.file.level = info
log.console = true
log.console.level = info
# 持久化配置
queue_master_locator = min-masters
# 最大连接数
connection_max = 65536
# 消息大小限制
max_message_size = 134217728
# 默认用户权限
default_vhost = /
default_user = admin
default_pass = your_password
default_user_tags.administrator = true
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*
enabled_plugins
[rabbitmq_management,
rabbitmq_management_agent,
rabbitmq_prometheus,
rabbitmq_shovel,
rabbitmq_shovel_management,
rabbitmq_federation,
rabbitmq_federation_management,
rabbitmq_delayed_message_exchange].
常用命令
# 查看状态
docker exec rabbitmq rabbitmqctl status
# 查看集群状态
docker exec rabbitmq rabbitmqctl cluster_status
# 查看队列列表
docker exec rabbitmq rabbitmqctl list_queues
# 查看详细队列信息
docker exec rabbitmq rabbitmqctl list_queues name messages consumers
# 查看交换机列表
docker exec rabbitmq rabbitmqctl list_exchanges
# 查看绑定关系
docker exec rabbitmq rabbitmqctl list_bindings
# 查看连接
docker exec rabbitmq rabbitmqctl list_connections
# 查看通道
docker exec rabbitmq rabbitmqctl list_channels
# 查看用户
docker exec rabbitmq rabbitmqctl list_users
# 查看虚拟主机
docker exec rabbitmq rabbitmqctl list_vhosts
# 查看权限
docker exec rabbitmq rabbitmqctl list_permissions
用户和权限管理
# 创建用户
docker exec rabbitmq rabbitmqctl add_user myuser mypassword
# 设置用户标签(管理员)
docker exec rabbitmq rabbitmqctl set_user_tags myuser administrator
# 设置用户权限
docker exec rabbitmq rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*"
# 删除用户
docker exec rabbitmq rabbitmqctl delete_user myuser
# 修改密码
docker exec rabbitmq rabbitmqctl change_password myuser newpassword
# 创建虚拟主机
docker exec rabbitmq rabbitmqctl add_vhost my_vhost
# 删除虚拟主机
docker exec rabbitmq rabbitmqctl delete_vhost my_vhost
# 授予用户对虚拟主机的权限
docker exec rabbitmq rabbitmqctl set_permissions -p my_vhost myuser ".*" ".*" ".*"
队列管理
# 声明队列
docker exec rabbitmq rabbitmqctl declare queue name=my_queue durable=true
# 删除队列
docker exec rabbitmq rabbitmqctl delete_queue my_queue
# 清空队列
docker exec rabbitmq rabbitmqctl purge_queue my_queue
# 查看队列详情
docker exec rabbitmq rabbitmqctl list_queues name messages_ready messages_unacknowledged messages
# 查看死信队列
docker exec rabbitmq rabbitmqctl list_queues name messages x-dead-letter-exchange x-dead-letter-routing-key
核心概念示例
1. 简单队列(Simple Queue)
// C# 生产者
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
// C# 消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received: {message}");
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
2. 工作队列(Work Queue)
// 生产者 - 发送任务
channel.QueueDeclare(queue: "task_queue",
durable: true, // 持久化队列
exclusive: false,
autoDelete: false,
arguments: null);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 消息持久化
for (int i = 0; i < 10; i++)
{
string message = $"Task {i}";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
}
// 消费者 - 处理任务
channel.QueueDeclare(queue: "task_queue", durable: true);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 公平分发
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Processing: {message}");
Thread.Sleep(1000); // 模拟处理任务
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false, // 手动确认
consumer: consumer);
3. 发布/订阅(Publish/Subscribe)
// 生产者 - 发布消息到 fanout 交换机
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
string message = "Log message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
// 消费者 - 订阅消息
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName; // 临时队列
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"Received: {message}");
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
4. 路由(Routing)
// 生产者 - 使用 direct 交换机
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
string severity = "error"; // info, warning, error
string message = "An error occurred";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
// 消费者 - 订阅特定路由键
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var queueName = channel.QueueDeclare().QueueName;
// 订阅多个路由键
var severities = new[] { "error", "warning" };
foreach (var severity in severities)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine($"[{routingKey}] {message}");
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
5. 主题(Topic)
// 生产者 - 使用 topic 交换机
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
string routingKey = "user.order.created"; // 格式: <entity>.<action>.<status>
string message = "New order created";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs",
routingKey: routingKey,
basicProperties: null,
body: body);
// 消费者 - 使用通配符订阅
channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
var queueName = channel.QueueDeclare().QueueName;
// * 匹配一个单词
// # 匹配零个或多个单词
var bindingKeys = new[] { "user.order.*", "user.#" };
foreach (var bindingKey in bindingKeys)
{
channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: bindingKey);
}
高级特性
1. 死信队列(DLX)
// 创建死信交换机和队列
channel.ExchangeDeclare(exchange: "dlx_exchange", type: ExchangeType.Direct);
channel.QueueDeclare(queue: "dlx_queue", durable: true);
channel.QueueBind(queue: "dlx_queue", exchange: "dlx_exchange", routingKey: "dlx");
// 创建业务队列,配置死信交换机
var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx_exchange" },
{ "x-dead-letter-routing-key", "dlx" },
{ "x-message-ttl", 10000 } // 10秒过期
};
channel.QueueDeclare(queue: "business_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);
2. 延迟队列
// 使用 TTL + 死信队列实现延迟队列
var delayArguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "delayed_exchange" },
{ "x-dead-letter-routing-key", "delayed" },
{ "x-message-ttl", 30000 } // 30秒延迟
};
channel.QueueDeclare(queue: "delay_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: delayArguments);
// 目标队列
channel.ExchangeDeclare(exchange: "delayed_exchange", type: ExchangeType.Direct);
channel.QueueDeclare(queue: "target_queue", durable: true);
channel.QueueBind(queue: "target_queue", exchange: "delayed_exchange", routingKey: "delayed");
// 发送延迟消息
var body = Encoding.UTF8.GetBytes("Delayed message");
channel.BasicPublish(exchange: "",
routingKey: "delay_queue",
basicProperties: null,
body: body);
3. 优先级队列
// 创建优先级队列
var arguments = new Dictionary<string, object>
{
{ "x-max-priority", 10 } // 最大优先级
};
channel.QueueDeclare(queue: "priority_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: arguments);
// 发送带优先级的消息
var properties = channel.CreateBasicProperties();
properties.Priority = 5; // 0-10
var body = Encoding.UTF8.GetBytes("Priority message");
channel.BasicPublish(exchange: "",
routingKey: "priority_queue",
basicProperties: properties,
body: body);
4. 消息确认机制
// 生产者确认
channel.ConfirmSelect();
channel.BasicPublish(exchange: "", routingKey: "test", body: body);
if (channel.WaitForConfirms())
{
Console.WriteLine("Message confirmed");
}
else
{
Console.WriteLine("Message nacked");
}
// 异步确认
channel.BasicAcks += (sender, ea) =>
{
Console.WriteLine($"Message {ea.DeliveryTag} confirmed");
};
channel.BasicNacks += (sender, ea) =>
{
Console.WriteLine($"Message {ea.DeliveryTag} nacked");
};
性能监控
# 查看节点内存使用
docker exec rabbitmq rabbitmqctl status | grep -A 3 memory
# 查看队列内存使用
docker exec rabbitmq rabbitmqctl list_queues name memory
# 查看连接数
docker exec rabbitmq rabbitmqctl list_connections name user peer_host peer_port
# 查看消费者数
docker exec rabbitmq rabbitmqctl list_consumers
# 查看未确认消息
docker exec rabbitmq rabbitmqctl list_queues name messages_unacknowledged
# 查看消息发布/消费速率
docker exec rabbitmq rabbitmqctl list_queues name message_stats
性能优化
1. 预取数量优化
// 每次预取 1 条消息(公平分发)
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 高吞吐量场景,增加预取数量
channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
2. 批量操作
// 批量发送
for (int i = 0; i < 1000; i++)
{
channel.BasicPublish(exchange: "", routingKey: "queue", body: body);
}
// 批量确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);
3. 连接池
// 使用单例 Connection,多个 Channel
public class RabbitMQService
{
private static readonly Lazy<IConnection> _connection = new(() =>
{
var factory = new ConnectionFactory() { HostName = "localhost" };
return factory.CreateConnection();
});
public IModel CreateChannel()
{
return _connection.Value.CreateModel();
}
}
最佳实践
最佳实践
可靠性
- ✅ 队列和消息持久化
- ✅ 消费者手动确认(autoAck = false)
- ✅ 生产者确认机制
- ✅ 配置死信队列处理失败消息
- ✅ 设置合理的 TTL
- ✅ 使用集群部署
性能
- ✅ 合理设置预取数量(prefetchCount)
- ✅ 使用批量操作
- ✅ 复用 Connection,多 Channel
- ✅ 避免在消费者中执行耗时操作
- ✅ 使用消息压缩
- ✅ 监控队列积压
安全
- ✅ 修改默认用户密码
- ✅ 使用虚拟主机隔离
- ✅ 最小权限原则
- ✅ 启用 TLS/SSL
- ✅ 限制网络访问
- ✅ 定期备份配置
开发
- ✅ 合理设计交换机和队列
- ✅ 使用有意义的命名
- ✅ 记录消息处理日志
- ✅ 实现幂等性
- ✅ 处理消息重试
- ✅ 监控和告警
常见问题
1. 消息丢失
// 解决方案:开启持久化 + 确认机制
channel.QueueDeclare(queue: "durable_queue", durable: true);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.ConfirmSelect();
channel.BasicPublish(exchange: "", routingKey: "durable_queue",
basicProperties: properties, body: body);
channel.WaitForConfirms();
2. 消息积压
# 增加消费者数量
# 增加预取数量
# 检查消费者处理性能
# 查看积压情况
docker exec rabbitmq rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
3. 内存告警
# 清空队列
docker exec rabbitmq rabbitmqctl purge_queue queue_name
# 调整内存水位
docker exec rabbitmq rabbitmqctl set_vm_memory_high_watermark 0.7