消息队列
顺序消费
不同消息中间件的解决方案不同。
RocketMQ提供了MessageQueueSelector选择机制,可以使用SelectMessageQueueByHash,是同一个订单发送到 同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息,即保证了发送有序。
RabbitMQ
如何保证消息的可靠性传输(如何处理消息的丢失问题)
生产者丢了数据
RabbitMQ的事务功能
生产者发送数据之前开启RabbitMQ事务(channel.txSelect),然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。
缺点:吞吐量下降,太耗性能。
confirm机制
在生产者那里设置开启confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了RabbitMQ中,RabbitMQ会给你回传一个ack消息,告诉说这个消息ok了。如果RabbitMQ没能处理这个消息,会回调nack接口,告诉说这个消息接收失败,可以重试。而且可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。
不同:事务机制是同步的,提交事务后会阻塞,但confirm机制是异步的,发送消息之后可以发送下一个消息,然后那个消息RabbitMQ接收了之后会异步回调接口通知你这个消息接收到了。
所以一般在生产者端避免数据丢失,都是用confirm机制的。
RabbitMQ丢失了数据
这个必须开启RabbitMQ的持久化,就是消息写入后会持久化到磁盘,RabbitMQ挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ还没持久化就挂了,可能导致少量数据会丢失的,但概率较小。
设置持久化有两个步骤:
创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里的数据;
发送消息时,将消息的deliveryMode设置为2,即将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘上去。
必须要同时设置这两个持久化才行,RabbitMQ哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。
而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,也是可以自己重发的。
消费者丢了数据
刚消费到,还没处理,此时进程挂了,比如重启,RabbitMQ认为你都消费了,这数据就丢了。
这时用RabbitMQ提供的ack机制,简单来说,就是关闭RabbitMQ自动ack,可以通过一个api来调用就行,然后每次代码里确保处理完的时候,再程序里ack。这样的话,如果还没处理完,就没有ack,那RabbitMQ就认为还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。
如何保证消息队列的高可用
RabbitMQ有三种模式:单机模式,普通集群模式,镜像集群模式。
单机模式
生产环境不会用到。
普通集群模式
多台机器上启动多个RabbitMQ实例,每个机器启动一个。但是创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据。消费时,如果连接到了另一个实例,那么那个实例会从queue所在实例上拉取数据过来。
这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。
镜像集群模式
这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,每次写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。
缺点:性能开销较大,消息同步所有机器,导致网络带宽压力和消耗很重;扩展性较差。
RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后你再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
如何保证消息不被重复消费(幂等性)
- 在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
- 在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID等)作为去重的依据,避免同一条消息被重复消费;
当下游业务异常时,触发消息队列的重试机制,但存在多个服务在监听该消息队列,可能导致消息重复消费,需保证接口幂等性。
幂等:同样的参数调用接口,结果都是一样的。
强校验
每次消息过来都要拿着订单号+业务场景这样的唯一标识去流水表查,看看有没有这条流水,有就直接return,没有就执行后面的逻辑。
public void process (String orderId) {
try{
// 查询这个订单是否存在这个活动加GMV的流水
Object gmvFlow = getFlowByOrderId("addGmv" + orderId);
if(Object.isNull(gmvFlow)) {
// 不存在流水,去加GMV和加流水 需放在一个事务中
addGmvAndFlow(orderId);
} else {
// 存在流水证明加过了 直接返回
return;
}
} catch (Exception e) {
// 发送异常 触发消息队列的重试机制
}
}弱校验
把这个id + 场景唯一标识,作为Redis的key,放到缓存里面(失效时间看场景),一定时间内的这个消息就去Redis判断。
如何保证消息的顺序性
消息积压
大量消息在MQ里积压了几小时还没解决
一般这个时候,只能操作临时紧急扩容了,具体操作步骤和思路如下:
1)先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉;
2)新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量;
3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue;
4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据;
5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据;
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息;
设置了过期时间
这个情况下,实际上没啥积压,而是丢了大量的消息。可以采取批量重导,大量积压时,直接丢弃数据,然后等过了高峰期以后,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入MQ里面去,把白天丢的数据给他补回来。
假设1万个订单积压在MQ里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到MQ里去再补一次。
死信队列
DLX,全称为 Dead-Letter-Exchange,当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定, 实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。
延迟队列
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
应用场景:订单系统,用延迟队列处理超时订单;用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
持久化?
使用RabbitMQ的场景
服务间异步通信,顺序消费,定时任务,请求削峰
消息如何路由
消息提供方 -> 路由 -> 一至多个队列
消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过binding key将Exchange和Queue链接在一起;
消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则),常用的Exchange主要分为一下三种:
fanout:它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中;
direct:它会把消息路由到那些binding key与routing key完全匹配的Queue中;
topic:可以通过通配符满足一部分规则就可以传送。它的约定是:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”;
- binding key与routing key一样也是句点号“. ”分隔的字符串;
- binding key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“ * ”用于匹配一个单词,“#”用于匹配多个单词(可以是零个);
