RabbitMQ 从入门到精通(二)

目录 1. 消息如何保障百分之百的投递成功? 1.1 方案一:消息落库,对消息状态进行打标 1.2 方案二:消息的延迟投递,做二次确认,回调检查 2. 幂等性...

1. 消息如何保障百分之百的投递成功?

什么是生产端的可靠性投递?

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的进行消息补偿机制

如果想保障消息百分百投递成功,只做到前三步不一定能够保障。有些时候或者说有些极端情况,比如生产端在投递消息时可能就失败了,或者说生产端投递了消息,MQ也收到了,MQ在返回确认应答时,由于网络闪断导致生产端没有收到应答,此时这条消息就不知道投递成功了还是失败了,所以针对这些情况我们需要做一些补偿机制。

1.1 方案一:消息落库,对消息状态进行打标

  1. 进行数据的入库,比如我们要发送一条订单消息,首先得把业务数据也就是订单信息存库,然后生成一条消息,把消息也进行入库,这条消息应该包含消息状态属性 Create_Date(创建时间),并设置初始标志 比如0,表示消息创建成功,正在发送中
  2. 首先要保证第一步消息都存储成功了,没有出现任何异常情况,然后生产端再进行消息发送。如果失败了就进行快速失败机制
  3. MQ把消息收到的结果应答(confirm)给生产端
  4. 生产端有一个Confirm Listener,去异步的监听Broker回送的响应,从而判断消息是否投递成功,如果成功,去数据库查询该消息,并将消息状态更新为1,表示消息投递成功
     
    假设第二步OK了,在第三步回送响应时,网络突然出现了闪断,导致生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了

  5. 此时我们需要设置一个规则,比如说消息在入库时候设置一个临界值timeout,5分钟之后如果还是0的状态那就需要把消息抽取出来。这里我们使用的是分布式定时任务,去定时抓取DB中距离消息创建时间超过5分钟的且状态为0的消息。
  6. 把抓取出来的消息进行重新投递(Retry Send),也就是从第二步开始继续往下走
  7. 当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败。

针对这种情况如何去做补偿呢,可以有一个补偿系统去查询这些最终失败的消息,然后给出失败的原因,当然这些可能都需要人工去操作。

第一种可靠性投递,在高并发的场景下是否适合?

对于第一种方案,我们需要做两次数据库的持久化操作,在高并发场景下显然数据库存在着性能瓶颈。其实在我们的核心链路中只需要对业务进行入库就可以了,消息就没必要先入库了,我们可以做消息的延迟投递,做二次确认,回调检查。

当然这种方案不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做都可以。

1.2 方案二:消息的延迟投递,做二次确认,回调检查

Upstream Service上游服务也就是生产端,Downstream service下游服务也就是消费端,Callback service就是回调服务。

  1. 先将业务消息进行入库,然后生产端将消息发送出去
  2. 在发送消息之后,紧接着生产端再次发送一条消息(Second Send Delay Check),即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递。
  3. 消费端去监听指定队列,将收到的消息进行处理。
  4. 处理完成之后,发送一个confirm消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中。
  5. 上面的Callback service是一个单独的服务,其实它扮演了第一种方案的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm消息,如果Callback service收到confirm消息,那么就对消息做持久化存储,即将消息持久化到DB中。
  6. 5分钟之后延迟消息发送到MQ了,然后Callback service还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么Callback service就需要主动发起RPC通信给上游服务,告诉它延迟检查的这条消息我没有找到,你需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去。

这么做的目的是少做了一次DB的存储,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量。所以能节省数据库的操作就尽量节省,可以异步的进行补偿。

其实在主流程里面是没有这个Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其他的步骤都是一个补偿机制。

第二种方案也是互联网大厂更为经典和主流的解决方案。但是若对性能要求不是那么高,第一种方案要更简单

2. 幂等性

2.1 幂等性是什么?

简单来说就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的。

我们可以借鉴数据库的乐观锁机制来举个例子:

  • 首先为表添加一个版本字段version

  • 在执行更新操作前呢,会先去数据库查询这个version

  • 然后执行更新语句,以version作为条件,例如:

    UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1

  • 如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种乐观锁的机制来保障幂等性。

2.2 消息端幂等性保障

重复消费问题:

当消费者消费完消息时,在给生产端返回ack时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题。

2.2.1 唯一ID+指纹码机制

唯一ID:业务表唯一的主键,如商品ID

指纹码:为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号或者标志位(具体视业务场景而定)

  • 唯一ID+指纹码机制,利用数据库主键去重
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID and IS_CONSUM= 指纹码
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:根据ID进行分库分表算法路由

整个思路就是首先我们需要根据消息生成一个全局唯一的ID,然后还需要加上一个指纹码。这个指纹码它并不一定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障这次操作是绝对唯一的。

将ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了。

2.2.2 利用Redis的原子性去实现

这里只提用Redis的原子性去解决MQ幂等性重复消费的问题

注意:MQ的幂等性问题 根本在于的是生产端未正常接收ACK,可能是网络抖动、网络中断导致

我的方案:

MQ消费端在消费开始时 将 ID放入到Redis的BitMap中,MQ生产端每次生产数据时,从Redis的BitMap对应位置若不能取出ID,则生产消息发送,否则不进行消息发送。

但是有人可能会说,万一消费端,生产端Redis命令执行失败了怎么办,虽然又出现重复消费又出现Redis非正常执行命令的可能性极低,但是万一呢?

OK,我们可以在Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。

3. Confirm机制

3.1 如何理解?

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递

的核心保障

确认机制流程图

生产端发送消息到Broker,然后Broker接收到了消息后,进行回送响应,生产端有一个Confirm Listener,去监听应答,当然这个操作是异步进行的,生产端将消息发送出去就可以不用管了,让内部监听器去监听Broker给我们的响应。

3.2 怎么实现?

  • 第一步,在channel上开启确认模式:channel.confirmSelect()
  • 第二步,在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!
public class Producer {
    public static void main(String[] args) throws Exception {
        
        //创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        
        //获取Connection
        Connection connection = connectionFactory.newConnection();
        
        //通过connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        //指定我们的消息投递模式
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.save";
        
        //发送一条信息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
        
        //添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            
            @Override
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("-------no ack!---------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("--------ack!----------");
            }
        });
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception{
        //创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        
        //获取Connection
        Connection connection = connectionFactory.newConnection();
        
        //通过connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.#";
        String queueName = "test_confirm_queue"; 
        
        //声明交换机和队列 然后进行绑定和 设置 最后制定路由key
        channel.exchangeDeclare(exchangeName, "topic",true);
        channel.queueDeclare(queueName, true, false, false, null);
        
        channel.queueBind(queueName, exchangeName, routingkey);
        
        //创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true,queueingConsumer); 
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }
}

运行说明

先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,然后启动生产端,消息被消费端消费,生产端也成功监听到了ACK响应。

4. Return机制

4.1 如何理解?

  • Return Listener 用于处理一些不可路由的消息!
  • 我们的消息生产者,通过指定一个Exchange 和Routingkey,把消息送达到某一个队列中去, 然后我们的消费者监听队列,进行消费处理操作!
  • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener!

4.2 如何实现?

  1. 添加return监听:addReturnListener,生产端去监听这些不可达的消息,做一些后续处理,比如说,记录下消息日志,或者及时去跟踪记录,有可能重新设置一下就好了
  2. 发送消息时,设置Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
public class ReturnProducer {
     public static void main(String[] args) throws Exception {
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.11");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHandshakeTimeout(20000);
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            //3 通过Connection创建一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_return_exchange";
            //String routingKey = "return.save";
            String routingKeyError = "abc.save";
            
            String msg = "Hello RabbitMQ Return Message";
            //添加return监听
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                        String routingKey, BasicProperties properties, byte[] body) throws IOException {
                    //replyCode:响应码    replyText:响应信息
                    System.err.println("---------handle  return----------");
                    System.err.println("replyCode: " + replyCode);
                    System.err.println("replyText: " + replyText);
                    System.err.println("exchange: " + exchange);
                    System.err.println("routingKey: " + routingKey);
                    //System.err.println("properties: " + properties);
                    System.err.println("body: " + new String(body));
                }

                
            });
            //5 发送一条消息,第三个参数mandatory:必须设置为true
            channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        }
}
public class ReturnConsumer {
    
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        //4 声明交换机和队列,然后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 创建消费者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费者: " + msg);
        }
    }
}

运行说明

先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,然后启动生产端。
由于生产端设置的是一个错误的路由key,所以消费端没有任何打印,而生产端打印了如下内容

如果我们将 Mandatory 属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。如果我们的路由key设置为正确的,那么消费端能够正确消费,生产端也不会进行任何打印。

  • 发表于 2019-06-08 23:00
  • 阅读 ( 194 )
  • 分类:网络文章

条评论

请先 登录 后评论
不写代码的码农
小编

篇文章

作家榜 »

  1. 小编 文章
返回顶部
部分文章转自于网络,若有侵权请联系我们删除