消息队列
为什么使用消息队列?
优势
- 解耦:生产者和消费者无需直接依赖
- 异步处理:提高系统响应速度
- 削峰填谷:处理突发流量
- 可靠性:消息持久化,保证不丢失
- 扩展性:轻松增加消费者处理能力
常见场景
- 邮件/短信发送
- 订单处理
- 日志收集
- 数据同步
- 任务调度
RabbitMQ.Client
安装依赖
dotnet add package RabbitMQ.Client
基本使用
- 配置
- 生产者
- 消费者
// appsettings.json
{
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "admin",
"Password": "your_password",
"VirtualHost": "/"
}
}
public class RabbitMQSettings
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
}
using RabbitMQ.Client;
using System.Text;
using System.Text.Json;
public interface IMessagePublisher
{
Task PublishAsync<T>(string queueName, T message);
}
public class RabbitMQPublisher : IMessagePublisher, IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly ILogger<RabbitMQPublisher> _logger;
public RabbitMQPublisher(
IOptions<RabbitMQSettings> settings,
ILogger<RabbitMQPublisher> logger)
{
_logger = logger;
var factory = new ConnectionFactory
{
HostName = settings.Value.HostName,
Port = settings.Value.Port,
UserName = settings.Value.UserName,
Password = settings.Value.Password,
VirtualHost = settings.Value.VirtualHost,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
public Task PublishAsync<T>(string queueName, T message)
{
// 声明队列
_channel.QueueDeclare(
queue: queueName,
durable: true, // 持久化队列
exclusive: false,
autoDelete: false,
arguments: null
);
// 序列化消息
var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
// 设置消息属性
var properties = _channel.CreateBasicProperties();
properties.Persistent = true; // 消息持久化
properties.ContentType = "application/json";
properties.MessageId = Guid.NewGuid().ToString();
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
// 发布消息
_channel.BasicPublish(
exchange: "",
routingKey: queueName,
basicProperties: properties,
body: body
);
_logger.LogInformation("消息已发布到队列 {Queue}: {Message}", queueName, json);
return Task.CompletedTask;
}
public void Dispose()
{
_channel?.Close();
_channel?.Dispose();
_connection?.Close();
_connection?.Dispose();
}
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Text.Json;
public interface IMessageConsumer
{
void StartConsuming<T>(string queueName, Func<T, Task> handleMessage);
}
public class RabbitMQConsumer : IMessageConsumer, IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly ILogger<RabbitMQConsumer> _logger;
public RabbitMQConsumer(
IOptions<RabbitMQSettings> settings,
ILogger<RabbitMQConsumer> logger)
{
_logger = logger;
var factory = new ConnectionFactory
{
HostName = settings.Value.HostName,
Port = settings.Value.Port,
UserName = settings.Value.UserName,
Password = settings.Value.Password,
VirtualHost = settings.Value.VirtualHost,
DispatchConsumersAsync = true // 启用异步消费
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
// 设置预取数量(每次最多获取 1 条消息)
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
}
public void StartConsuming<T>(string queueName, Func<T, Task> handleMessage)
{
// 声明队列
_channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);
// 创建消费者
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (sender, ea) =>
{
try
{
var body = ea.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
_logger.LogInformation("收到消息: {Message}", json);
// 反序列化消息
var message = JsonSerializer.Deserialize<T>(json);
if (message != null)
{
// 处理消息
await handleMessage(message);
// 确认消息
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
_logger.LogInformation("消息处理成功");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理失败");
// 拒绝消息并重新入队
_channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
}
};
// 开始消费
_channel.BasicConsume(
queue: queueName,
autoAck: false, // 手动确认
consumer: consumer
);
_logger.LogInformation("开始消费队列: {Queue}", queueName);
}
public void Dispose()
{
_channel?.Close();
_channel?.Dispose();
_connection?.Close();
_connection?.Dispose();
}
}
使用示例
// 消息类型
public class OrderCreatedMessage
{
public int OrderId { get; set; }
public string CustomerName { get; set; } = string.Empty;
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
}
// 注册服务
builder.Services.Configure<RabbitMQSettings>(builder.Configuration.GetSection("RabbitMQ"));
builder.Services.AddSingleton<IMessagePublisher, RabbitMQPublisher>();
builder.Services.AddSingleton<IMessageConsumer, RabbitMQConsumer>();
// 发布消息
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IMessagePublisher _publisher;
public OrdersController(IMessagePublisher publisher)
{
_publisher = publisher;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
// 创建订单逻辑
var orderId = new Random().Next(1000, 9999);
// 发布消息
await _publisher.PublishAsync("order.created", new OrderCreatedMessage
{
OrderId = orderId,
CustomerName = request.CustomerName,
TotalAmount = request.TotalAmount,
CreatedAt = DateTime.UtcNow
});
return Ok(new { OrderId = orderId });
}
}
// 后台服务消费消息
public class OrderMessageConsumer : BackgroundService
{
private readonly IMessageConsumer _consumer;
private readonly ILogger<OrderMessageConsumer> _logger;
public OrderMessageConsumer(
IMessageConsumer consumer,
ILogger<OrderMessageConsumer> logger)
{
_consumer = consumer;
_logger = logger;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.StartConsuming<OrderCreatedMessage>("order.created", async message =>
{
_logger.LogInformation("处理订单: {OrderId}", message.OrderId);
// 发送邮件、更新库存等业务逻辑
await Task.Delay(1000); // 模拟处理
_logger.LogInformation("订单处理完成: {OrderId}", message.OrderId);
});
return Task.CompletedTask;
}
}
// 注册后台服务
builder.Services.AddHostedService<OrderMessageConsumer>();
MassTransit
MassTransit 是一个强大的分布式应用框架,提供了消息总线、状态机、调度等功能。
安装依赖
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ
基本配置
- 配置服务
- 发布消息
- 消费消息
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMassTransit(x =>
{
// 添加消费者
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<EmailSendingConsumer>();
// 配置 RabbitMQ
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("admin");
h.Password("your_password");
});
// 配置端点
cfg.ReceiveEndpoint("order-created-queue", e =>
{
e.ConfigureConsumer<OrderCreatedConsumer>(context);
// 重试策略
e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
// 限流
e.UseConcurrencyLimit(10);
});
cfg.ReceiveEndpoint("email-queue", e =>
{
e.ConfigureConsumer<EmailSendingConsumer>(context);
});
});
});
var app = builder.Build();
public class OrderService
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly ILogger<OrderService> _logger;
public OrderService(IPublishEndpoint publishEndpoint, ILogger<OrderService> logger)
{
_publishEndpoint = publishEndpoint;
_logger = logger;
}
public async Task CreateOrderAsync(CreateOrderRequest request)
{
// 创建订单
var orderId = Guid.NewGuid();
// 发布事件
await _publishEndpoint.Publish(new OrderCreated
{
OrderId = orderId,
CustomerName = request.CustomerName,
TotalAmount = request.TotalAmount,
CreatedAt = DateTime.UtcNow
});
_logger.LogInformation("订单已创建: {OrderId}", orderId);
}
}
// 事件定义
public record OrderCreated
{
public Guid OrderId { get; init; }
public string CustomerName { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public DateTime CreatedAt { get; init; }
}
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
private readonly ILogger<OrderCreatedConsumer> _logger;
private readonly IPublishEndpoint _publishEndpoint;
public OrderCreatedConsumer(
ILogger<OrderCreatedConsumer> logger,
IPublishEndpoint publishEndpoint)
{
_logger = logger;
_publishEndpoint = publishEndpoint;
}
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var message = context.Message;
_logger.LogInformation("处理订单创建事件: {OrderId}", message.OrderId);
// 业务逻辑
await Task.Delay(500); // 模拟处理
// 发布后续事件
await _publishEndpoint.Publish(new SendWelcomeEmail
{
CustomerName = message.CustomerName,
Email = "customer@example.com"
});
_logger.LogInformation("订单处理完成: {OrderId}", message.OrderId);
}
}
// 邮件发送消费者
public class EmailSendingConsumer : IConsumer<SendWelcomeEmail>
{
private readonly ILogger<EmailSendingConsumer> _logger;
public EmailSendingConsumer(ILogger<EmailSendingConsumer> logger)
{
_logger = logger;
}
public async Task Consume(ConsumeContext<SendWelcomeEmail> context)
{
var message = context.Message;
_logger.LogInformation("发送欢迎邮件给: {CustomerName}", message.CustomerName);
// 发送邮件逻辑
await Task.Delay(1000);
_logger.LogInformation("邮件发送成功");
}
}
public record SendWelcomeEmail
{
public string CustomerName { get; init; } = string.Empty;
public string Email { get; init; } = string.Empty;
}
高级特性
1. 请求/响应模式
// 请求消息
public record GetOrderDetails
{
public Guid OrderId { get; init; }
}
// 响应消息
public record OrderDetails
{
public Guid OrderId { get; init; }
public string Status { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
}
// 消费者
public class GetOrderDetailsConsumer : IConsumer<GetOrderDetails>
{
public async Task Consume(ConsumeContext<GetOrderDetails> context)
{
// 查询订单
var orderDetails = new OrderDetails
{
OrderId = context.Message.OrderId,
Status = "Processing",
TotalAmount = 199.99M
};
// 响应
await context.RespondAsync(orderDetails);
}
}
// 发送请求
public class OrderQueryService
{
private readonly IRequestClient<GetOrderDetails> _client;
public OrderQueryService(IRequestClient<GetOrderDetails> client)
{
_client = client;
}
public async Task<OrderDetails> GetOrderAsync(Guid orderId)
{
var response = await _client.GetResponse<OrderDetails>(new GetOrderDetails
{
OrderId = orderId
});
return response.Message;
}
}
// 注册请求客户端
builder.Services.AddScoped<IRequestClient<GetOrderDetails>>(provider =>
{
var bus = provider.GetRequiredService<IBusControl>();
return bus.CreateRequestClient<GetOrderDetails>(
new Uri("queue:get-order-details-queue"),
timeout: TimeSpan.FromSeconds(30)
);
});
2. 状态机(Saga)
// 状态机状态
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = string.Empty;
public Guid OrderId { get; set; }
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
}
// 状态机定义
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => OrderCreated, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => PaymentCompleted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderShipped, x => x.CorrelateById(context => context.Message.OrderId));
Initially(
When(OrderCreated)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.TotalAmount = context.Message.TotalAmount;
context.Saga.CreatedAt = context.Message.CreatedAt;
})
.TransitionTo(AwaitingPayment)
.Publish(context => new OrderCreatedEvent(context.Saga))
);
During(AwaitingPayment,
When(PaymentCompleted)
.TransitionTo(PaymentReceived)
.Publish(context => new PaymentCompletedEvent(context.Saga))
);
During(PaymentReceived,
When(OrderShipped)
.TransitionTo(Completed)
.Finalize()
);
SetCompletedWhenFinalized();
}
public State AwaitingPayment { get; private set; } = null!;
public State PaymentReceived { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public Event<OrderCreated> OrderCreated { get; private set; } = null!;
public Event<PaymentCompleted> PaymentCompleted { get; private set; } = null!;
public Event<OrderShipped> OrderShipped { get; private set; } = null!;
}
// 事件定义
public record PaymentCompleted
{
public Guid OrderId { get; init; }
}
public record OrderShipped
{
public Guid OrderId { get; init; }
}
// 注册状态机
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository(); // 或使用 EntityFrameworkRepository
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("admin");
h.Password("your_password");
});
cfg.ConfigureEndpoints(context);
});
});
3. 重试和错误处理
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost");
cfg.ReceiveEndpoint("order-queue", e =>
{
e.ConfigureConsumer<OrderConsumer>(context);
// 重试策略
e.UseMessageRetry(r =>
{
r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
});
// 延迟重试
e.UseDelayedRedelivery(r => r.Intervals(
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15),
TimeSpan.FromSeconds(30)
));
// 限流
e.UseConcurrencyLimit(10);
// 速率限制
e.UseRateLimit(100, TimeSpan.FromMinutes(1));
// 断路器
e.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15;
cb.ActiveThreshold = 10;
cb.ResetInterval = TimeSpan.FromMinutes(5);
});
});
// 配置错误队列
cfg.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("dev", false));
});
4. 调度任务
// 调度消息
public record SendDailyReport
{
public DateTime ReportDate { get; init; }
}
// 消费者
public class SendDailyReportConsumer : IConsumer<SendDailyReport>
{
public async Task Consume(ConsumeContext<SendDailyReport> context)
{
// 生成并发送报表
await Task.Delay(1000);
}
}
// 配置调度
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<SendDailyReportConsumer>();
x.AddDelayedMessageScheduler();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost");
cfg.UseDelayedMessageScheduler();
cfg.ConfigureEndpoints(context);
});
});
// 调度任务
public class ReportScheduler
{
private readonly IMessageScheduler _scheduler;
public ReportScheduler(IMessageScheduler scheduler)
{
_scheduler = scheduler;
}
public async Task ScheduleDailyReportAsync()
{
// 每天早上 9 点发送报表
var scheduledTime = DateTime.Today.AddHours(9);
if (scheduledTime < DateTime.Now)
{
scheduledTime = scheduledTime.AddDays(1);
}
await _scheduler.SchedulePublish(scheduledTime, new SendDailyReport
{
ReportDate = DateTime.Today
});
}
}
最佳实践
最佳实践
常见问题
1. 消息丢失
// 确保消息持久化
properties.Persistent = true;
// 使用发布者确认
_channel.ConfirmSelect();
_channel.BasicPublish(...);
_channel.WaitForConfirms();
// 手动确认
_channel.BasicAck(deliveryTag, false);
2. 消息重复
// 使用消息 ID 实现幂等性
public class OrderProcessor
{
private readonly HashSet<string> _processedMessages = new();
public async Task ProcessAsync(string messageId, OrderMessage message)
{
if (_processedMessages.Contains(messageId))
{
return; // 已处理过,跳过
}
// 处理消息
await ProcessOrderAsync(message);
_processedMessages.Add(messageId);
}
}
3. 消息顺序
// 使用相同的路由键确保顺序
var routingKey = $"order.{orderId}";
_channel.BasicPublish(exchange, routingKey, properties, body);
// 或使用单个消费者
_channel.BasicQos(0, 1, false);