📨 消息队列

微服务消息传递模式:异步通信架构设计

深入探讨微服务架构中的消息传递模式,包括发布订阅模式、点对点模式、消息路由等异步通信技术

一、消息传递概述

在微服务架构中,消息传递是实现服务间松耦合通信的关键技术,支持异步处理和事件驱动架构。

二、点对点模式

点对点模式中,消息由单个消费者处理,适用于任务分发场景。

// RabbitMQ 点对点模式
public class TaskProducer
{
    private readonly IConnection _connection;
    private readonly IModel _channel;

    public void SendTask(string message)
    {
        _channel.QueueDeclare(queue: "tasks",
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        var body = Encoding.UTF8.GetBytes(message);
        _channel.BasicPublish(exchange: "",
                             routingKey: "tasks",
                             basicProperties: null,
                             body: body);
    }
}

三、发布订阅模式

发布订阅模式中,消息可以被多个消费者订阅和处理。

// RabbitMQ 发布订阅模式
public class EventPublisher
{
    public void PublishOrderCreated(OrderCreatedEvent @event)
    {
        var exchangeName = "order_events";
        _channel.ExchangeDeclare(exchange: exchangeName,
                                type: ExchangeType.Fanout);

        var message = JsonConvert.SerializeObject(@event);
        var body = Encoding.UTF8.GetBytes(message);
        
        _channel.BasicPublish(exchange: exchangeName,
                             routingKey: "",
                             basicProperties: null,
                             body: body);
    }
}

四、主题模式

主题模式支持基于规则的消息路由,消费者可以订阅特定主题。

// Kafka 主题模式
public class KafkaProducer
{
    private readonly IProducer<string, string> _producer;

    public async Task ProduceAsync(string topic, string message)
    {
        var result = await _producer.ProduceAsync(topic, 
            new Message<string, string> { Value = message });
        Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");
    }
}

五、请求响应模式

请求响应模式支持同步风格的异步通信。

// 使用 RabbitMQ 实现请求响应
public async Task<string> RequestData(string request)
{
    var correlationId = Guid.NewGuid().ToString();
    var replyQueue = _channel.QueueDeclare().QueueName;
    
    var props = _channel.CreateBasicProperties();
    props.CorrelationId = correlationId;
    props.ReplyTo = replyQueue;
    
    _channel.BasicPublish(exchange: "",
                         routingKey: "request_queue",
                         basicProperties: props,
                         body: Encoding.UTF8.GetBytes(request));
    
    // 等待响应...
}

六、消息持久化与可靠性

确保消息在系统故障时不丢失。

七、消息幂等性

保证消息重复处理时结果一致。

public async Task ProcessMessage(Message message)
{
    // 使用消息ID实现幂等性
    if (await _messageTracker.HasProcessed(message.Id))
    {
        return; // 已处理过,直接返回
    }
    
    // 处理消息业务逻辑
    await _businessService.Process(message);
    
    // 标记消息已处理
    await _messageTracker.MarkProcessed(message.Id);
}

八、消息编排与Saga

使用消息驱动实现分布式事务。

九、消息队列选型对比

对比RabbitMQ、Kafka、ActiveMQ等消息队列的特点和适用场景。

十、最佳实践总结

包括消息格式设计、错误处理、监控告警等方面的最佳实践。