Skip to main content

消息队列

为什么使用消息队列?

优势

  • 解耦:生产者和消费者无需直接依赖
  • 异步处理:提高系统响应速度
  • 削峰填谷:处理突发流量
  • 可靠性:消息持久化,保证不丢失
  • 扩展性:轻松增加消费者处理能力

常见场景

  • 邮件/短信发送
  • 订单处理
  • 日志收集
  • 数据同步
  • 任务调度

RabbitMQ.Client

安装依赖

dotnet add package RabbitMQ.Client

基本使用

// appsettings.json
{
"RabbitMQ": {
"HostName": "localhost",
"Port": 5672,
"UserName": "admin",
"Password": "your_password",
"VirtualHost": "/"
}
}
public class RabbitMQSettings
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
}

使用示例

// 消息类型
public class OrderCreatedMessage
{
public int OrderId { get; set; }
public string CustomerName { get; set; } = string.Empty;
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
}

// 注册服务
builder.Services.Configure<RabbitMQSettings>(builder.Configuration.GetSection("RabbitMQ"));
builder.Services.AddSingleton<IMessagePublisher, RabbitMQPublisher>();
builder.Services.AddSingleton<IMessageConsumer, RabbitMQConsumer>();

// 发布消息
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IMessagePublisher _publisher;

public OrdersController(IMessagePublisher publisher)
{
_publisher = publisher;
}

[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
// 创建订单逻辑
var orderId = new Random().Next(1000, 9999);

// 发布消息
await _publisher.PublishAsync("order.created", new OrderCreatedMessage
{
OrderId = orderId,
CustomerName = request.CustomerName,
TotalAmount = request.TotalAmount,
CreatedAt = DateTime.UtcNow
});

return Ok(new { OrderId = orderId });
}
}

// 后台服务消费消息
public class OrderMessageConsumer : BackgroundService
{
private readonly IMessageConsumer _consumer;
private readonly ILogger<OrderMessageConsumer> _logger;

public OrderMessageConsumer(
IMessageConsumer consumer,
ILogger<OrderMessageConsumer> logger)
{
_consumer = consumer;
_logger = logger;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.StartConsuming<OrderCreatedMessage>("order.created", async message =>
{
_logger.LogInformation("处理订单: {OrderId}", message.OrderId);

// 发送邮件、更新库存等业务逻辑
await Task.Delay(1000); // 模拟处理

_logger.LogInformation("订单处理完成: {OrderId}", message.OrderId);
});

return Task.CompletedTask;
}
}

// 注册后台服务
builder.Services.AddHostedService<OrderMessageConsumer>();

MassTransit

MassTransit 是一个强大的分布式应用框架,提供了消息总线、状态机、调度等功能。

安装依赖

dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ

基本配置

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMassTransit(x =>
{
// 添加消费者
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<EmailSendingConsumer>();

// 配置 RabbitMQ
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("admin");
h.Password("your_password");
});

// 配置端点
cfg.ReceiveEndpoint("order-created-queue", e =>
{
e.ConfigureConsumer<OrderCreatedConsumer>(context);

// 重试策略
e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));

// 限流
e.UseConcurrencyLimit(10);
});

cfg.ReceiveEndpoint("email-queue", e =>
{
e.ConfigureConsumer<EmailSendingConsumer>(context);
});
});
});

var app = builder.Build();

高级特性

1. 请求/响应模式

// 请求消息
public record GetOrderDetails
{
public Guid OrderId { get; init; }
}

// 响应消息
public record OrderDetails
{
public Guid OrderId { get; init; }
public string Status { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
}

// 消费者
public class GetOrderDetailsConsumer : IConsumer<GetOrderDetails>
{
public async Task Consume(ConsumeContext<GetOrderDetails> context)
{
// 查询订单
var orderDetails = new OrderDetails
{
OrderId = context.Message.OrderId,
Status = "Processing",
TotalAmount = 199.99M
};

// 响应
await context.RespondAsync(orderDetails);
}
}

// 发送请求
public class OrderQueryService
{
private readonly IRequestClient<GetOrderDetails> _client;

public OrderQueryService(IRequestClient<GetOrderDetails> client)
{
_client = client;
}

public async Task<OrderDetails> GetOrderAsync(Guid orderId)
{
var response = await _client.GetResponse<OrderDetails>(new GetOrderDetails
{
OrderId = orderId
});

return response.Message;
}
}

// 注册请求客户端
builder.Services.AddScoped<IRequestClient<GetOrderDetails>>(provider =>
{
var bus = provider.GetRequiredService<IBusControl>();
return bus.CreateRequestClient<GetOrderDetails>(
new Uri("queue:get-order-details-queue"),
timeout: TimeSpan.FromSeconds(30)
);
});

2. 状态机(Saga)

// 状态机状态
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = string.Empty;
public Guid OrderId { get; set; }
public decimal TotalAmount { get; set; }
public DateTime CreatedAt { get; set; }
}

// 状态机定义
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);

Event(() => OrderCreated, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => PaymentCompleted, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderShipped, x => x.CorrelateById(context => context.Message.OrderId));

Initially(
When(OrderCreated)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.TotalAmount = context.Message.TotalAmount;
context.Saga.CreatedAt = context.Message.CreatedAt;
})
.TransitionTo(AwaitingPayment)
.Publish(context => new OrderCreatedEvent(context.Saga))
);

During(AwaitingPayment,
When(PaymentCompleted)
.TransitionTo(PaymentReceived)
.Publish(context => new PaymentCompletedEvent(context.Saga))
);

During(PaymentReceived,
When(OrderShipped)
.TransitionTo(Completed)
.Finalize()
);

SetCompletedWhenFinalized();
}

public State AwaitingPayment { get; private set; } = null!;
public State PaymentReceived { get; private set; } = null!;
public State Completed { get; private set; } = null!;

public Event<OrderCreated> OrderCreated { get; private set; } = null!;
public Event<PaymentCompleted> PaymentCompleted { get; private set; } = null!;
public Event<OrderShipped> OrderShipped { get; private set; } = null!;
}

// 事件定义
public record PaymentCompleted
{
public Guid OrderId { get; init; }
}

public record OrderShipped
{
public Guid OrderId { get; init; }
}

// 注册状态机
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository(); // 或使用 EntityFrameworkRepository

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("admin");
h.Password("your_password");
});

cfg.ConfigureEndpoints(context);
});
});

3. 重试和错误处理

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost");

cfg.ReceiveEndpoint("order-queue", e =>
{
e.ConfigureConsumer<OrderConsumer>(context);

// 重试策略
e.UseMessageRetry(r =>
{
r.Incremental(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2));
});

// 延迟重试
e.UseDelayedRedelivery(r => r.Intervals(
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15),
TimeSpan.FromSeconds(30)
));

// 限流
e.UseConcurrencyLimit(10);

// 速率限制
e.UseRateLimit(100, TimeSpan.FromMinutes(1));

// 断路器
e.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15;
cb.ActiveThreshold = 10;
cb.ResetInterval = TimeSpan.FromMinutes(5);
});
});

// 配置错误队列
cfg.ConfigureEndpoints(context, new KebabCaseEndpointNameFormatter("dev", false));
});

4. 调度任务

// 调度消息
public record SendDailyReport
{
public DateTime ReportDate { get; init; }
}

// 消费者
public class SendDailyReportConsumer : IConsumer<SendDailyReport>
{
public async Task Consume(ConsumeContext<SendDailyReport> context)
{
// 生成并发送报表
await Task.Delay(1000);
}
}

// 配置调度
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<SendDailyReportConsumer>();

x.AddDelayedMessageScheduler();

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost");

cfg.UseDelayedMessageScheduler();

cfg.ConfigureEndpoints(context);
});
});

// 调度任务
public class ReportScheduler
{
private readonly IMessageScheduler _scheduler;

public ReportScheduler(IMessageScheduler scheduler)
{
_scheduler = scheduler;
}

public async Task ScheduleDailyReportAsync()
{
// 每天早上 9 点发送报表
var scheduledTime = DateTime.Today.AddHours(9);
if (scheduledTime < DateTime.Now)
{
scheduledTime = scheduledTime.AddDays(1);
}

await _scheduler.SchedulePublish(scheduledTime, new SendDailyReport
{
ReportDate = DateTime.Today
});
}
}

最佳实践

最佳实践

消息设计

  • ✅ 消息应该是不可变的(使用 record)
  • ✅ 包含足够的上下文信息
  • ✅ 使用有意义的命名(事件用过去式)
  • ✅ 版本化消息结构
  • ✅ 避免在消息中传递大对象

消费者设计

  • ✅ 实现幂等性处理
  • ✅ 保持消费者逻辑简单
  • ✅ 使用异步处理
  • ✅ 记录详细日志
  • ✅ 合理设置预取数量

可靠性

  • ✅ 启用消息持久化
  • ✅ 使用手动确认模式
  • ✅ 配置死信队列
  • ✅ 实现重试机制
  • ✅ 监控队列积压

性能

  • ✅ 批量发布消息
  • ✅ 使用连接池
  • ✅ 合理设置并发数
  • ✅ 避免阻塞操作
  • ✅ 使用分区提高吞吐量

安全

  • ✅ 使用强密码
  • ✅ 限制网络访问
  • ✅ 加密敏感信息
  • ✅ 定期更新依赖
  • ✅ 监控异常访问

常见问题

1. 消息丢失

// 确保消息持久化
properties.Persistent = true;

// 使用发布者确认
_channel.ConfirmSelect();
_channel.BasicPublish(...);
_channel.WaitForConfirms();

// 手动确认
_channel.BasicAck(deliveryTag, false);

2. 消息重复

// 使用消息 ID 实现幂等性
public class OrderProcessor
{
private readonly HashSet<string> _processedMessages = new();

public async Task ProcessAsync(string messageId, OrderMessage message)
{
if (_processedMessages.Contains(messageId))
{
return; // 已处理过,跳过
}

// 处理消息
await ProcessOrderAsync(message);

_processedMessages.Add(messageId);
}
}

3. 消息顺序

// 使用相同的路由键确保顺序
var routingKey = $"order.{orderId}";
_channel.BasicPublish(exchange, routingKey, properties, body);

// 或使用单个消费者
_channel.BasicQos(0, 1, false);

相关资源