一、消息传递概述
在微服务架构中,消息传递是实现服务间松耦合通信的关键技术,支持异步处理和事件驱动架构。
二、点对点模式
点对点模式中,消息由单个消费者处理,适用于任务分发场景。
// RabbitMQ 点对点模式
public class TaskProducer
{
private readonly IConnection _connection;
private readonly IModel _channel;
public void SendTask(string message)
{
_channel.QueueDeclare(queue: "tasks",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "",
routingKey: "tasks",
basicProperties: null,
body: body);
}
}
三、发布订阅模式
发布订阅模式中,消息可以被多个消费者订阅和处理。
// RabbitMQ 发布订阅模式
public class EventPublisher
{
public void PublishOrderCreated(OrderCreatedEvent @event)
{
var exchangeName = "order_events";
_channel.ExchangeDeclare(exchange: exchangeName,
type: ExchangeType.Fanout);
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: exchangeName,
routingKey: "",
basicProperties: null,
body: body);
}
}
四、主题模式
主题模式支持基于规则的消息路由,消费者可以订阅特定主题。
// Kafka 主题模式
public class KafkaProducer
{
private readonly IProducer<string, string> _producer;
public async Task ProduceAsync(string topic, string message)
{
var result = await _producer.ProduceAsync(topic,
new Message<string, string> { Value = message });
Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");
}
}
五、请求响应模式
请求响应模式支持同步风格的异步通信。
// 使用 RabbitMQ 实现请求响应
public async Task<string> RequestData(string request)
{
var correlationId = Guid.NewGuid().ToString();
var replyQueue = _channel.QueueDeclare().QueueName;
var props = _channel.CreateBasicProperties();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueue;
_channel.BasicPublish(exchange: "",
routingKey: "request_queue",
basicProperties: props,
body: Encoding.UTF8.GetBytes(request));
// 等待响应...
}
六、消息持久化与可靠性
确保消息在系统故障时不丢失。
七、消息幂等性
保证消息重复处理时结果一致。
public async Task ProcessMessage(Message message)
{
// 使用消息ID实现幂等性
if (await _messageTracker.HasProcessed(message.Id))
{
return; // 已处理过,直接返回
}
// 处理消息业务逻辑
await _businessService.Process(message);
// 标记消息已处理
await _messageTracker.MarkProcessed(message.Id);
}
八、消息编排与Saga
使用消息驱动实现分布式事务。
九、消息队列选型对比
对比RabbitMQ、Kafka、ActiveMQ等消息队列的特点和适用场景。
十、最佳实践总结
包括消息格式设计、错误处理、监控告警等方面的最佳实践。