RocketMQ笔记:-创新互联
如果微服务是直接相互通过Feign或其他方式互相调用的,那每个微服务都将在自己的代码中实现调用的逻辑,如果被调用端需要提供的参数或者逻辑发生了变动,则调用端可能也需要修改自己的代码。用图来理解,如果每个微服务都需要互相调用,若有n个微服务,则每个微服务都需要实现其他n-1个微服务的调用逻辑。
创新互联建站自2013年创立以来,是专业互联网技术服务公司,拥有项目网站建设、成都网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元泰山做网站,已为上家服务,为泰山各地企业和个人服务,联系电话:18980820575如果使用RocketMQ,将微服务直接的调用转换成调用者直接发送对应的消息给RocketMQ中,而被调用者直接从RocketMQ取属于自己的消息去消费,这样就无需再互相实现对方的调用逻辑了。而且这次请求也从同步变成了异步。
2、流量削峰:由于RocketMQ是一个队列,队列本身就具有缓和队头和队尾速度差的作用,如果没有队列,所有的请求都由生产者直接调用消费者,由于消费者消费请求需要时间,如果消费者是NIO模型,就会为之创建管道,将管道注册进Selector中,如果同时有一万个请求,则消费者的内存会很快溢出,造成消费者崩溃。而如果将这些请求放在MQ的队列中等待,则在MQ中它们只是以一个消息的方式存在,比起管道占用的内存就小了很多,消费者就可以慢慢从MQ中消费消息。
3、异步处理:假设一个买票场景,假设前端请求后端要执行四个任务:前端发送所需信息调用后端接口,后端收到后:
1、支付系统等待用户支付;
2、订单系统修改订单状态为已支付;
3、向用户发送短信通知。
如果不使用消息队列,则可以这样处理:1处理完成后开始2,2处理完成后开始3......(以此类推)如果这样处理,则用户必须要等待四个任务全部都处理完成了,才能得到响应的结果,而且如果其中一个步骤出现了问题,则3个步骤都得回滚重新发起请求。但是这三个步骤并不需要一起完成后再反馈给用户,可以在支付系统收到请求之后,就反馈给用户已提交,剩下的异步处理(分别发送消息给订单系统和通知系统),完成后再通知用户即可,这样就提高了吞吐量。
二、RocketMQ如何保证信息不丢失:什么时候可能会发生信息丢失:
1、生产者发送消息给Broker的时候
2、Broker在进行集群间同步(主从同步)的时候
3、Broker在持久化的时候
4、消费者从Broker取消息的时候
如何设计对应的解决方案,使这四个情况都不会造成消息丢失即可。
对于1,在RocketMQ中可以使用事物消息机制:①:首先生产者发送half消息,即要提交的消息本身。
②:Broker回复一个half消息告诉生产者收到了消息,但是消息会被MQ标记为“暂不能投递”状态,需要生产者的本地事物完成并再发送二次确认的结果:Commit或者Rollback,才可以被投递或回滚。
③:生产者在本地事物完成以后再发送Commit或者Rollback,使之可以被交付或者回滚。
在此期间Broker可以回查生产者本地事物的状态,生产者如果事物还未完成可以发送unknown给Broker,则Broker继续等待,过一会接着回查。如果在此期间生产者断网或重启了,在时间间隔内没有收到确认结果,则会向生产者集群中的任意一个发起回查。
对于2,即在MQ主从消息同步保证消息不丢失:RocketMQ提供了同步双写方式和异步复制方式。
同步双写方式:在生产者发送消息给Master,Master收到消息后将消息复制到Slave,当Slave和Master都写入成功以后,才会像生产者发送收到回复。(此方式不会发生消息丢失,但是性能回避异步复制方式低10%左右)
异步复制方式:在生产者发送消息给Master之后,Master写入成功就会返回成功,Master再异步将消息复制到Slave中去。
对于3,即在MQ进行持久化时保证消息不丢失同步刷盘:在生产者写入RocketMQ的时候,MQ立刻进行刷盘操作,在刷盘成功之后,再给生产者返回成功确认。不会造成消息丢失。
异步刷盘:在生产者写入RocketMQ的时候,MQ在将消息发送任务给另一个负责存盘的线程,存入该线程内存中,内存写入成功就先返回成功确认,再由该存盘线程负责把内存中的消息刷盘。
对于4,即在消费者从MQ取消息的时候保证消息不丢失Consumer需要在收到消息后,执行本地事务,在本地事物完成之后,返回给MQ确认,MQ在收到确认之后,才会移动offset。(MQ在收到确认之后才会移动Offset)
三、如何保证消息消费的幂等性什么是幂等性?
防止消费者重复消费。
所有MQ产品并没有提供主动解决幂等性的机制,需要由消费者自行控制。
RocketMQ给每个消息分配了一个MessageID,该ID可以作为判断幂等的工具。本地事物在存入数据库的时候将该ID存入数据库,再在执行逻辑前检查收到的消息的MessageID是否已经存在数据库中。由于rocketMQ中在并发量很大时的MessageID并不能保证全局唯一。
或者可以自己带一个业务ID,通过该ID来判断。
四、RocketMQ如何保证消息顺序全局有序和局部有序:MQ只需要保证局部有序,不需要保证全局有序。比如对于同一个订单的不同步骤(订单内的步骤间)需要有序,而订单与订单之间的顺序无需关心。
RocketMQ如何实现?在sendMessage时生产者重新生成一个MessageQueueSelector类的对象,但是需重写此类的select方法(根据订单号OrderID用队列的数量mqs.size()取余即可)
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(Listmqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();//对于同一个订单,由于其OrderID相同,因此会发送到同一个队列中
return mqs.get(index);
}
}, orderId);
除了要保证生产者在发送消息给MQ时,还需要保证消费者在取消息时保证从队列中同时取。
由于生产者在写入MQ的时候MQ中存在多个队列,MQ可能将这些消息负载均衡到多个队列中,我们可以通过在发送时多添加一个参数:MessageQueueSelector的对象,并重写select方法。
对于消费者,在从MQ中消费消息时,令消费者一次取出某队列中的所有消息进行消费即可。
五、如何保证消息的高效读写?kafka和RocketMQ使用零拷贝技术:
零拷贝技术有两种方式:1、Mmap方式;2、transFile方式。
使用内存映射在用户程序中操作文件,通过MappedByteBuffer操作对象。
Mmap(Memory Map内存映射)方式适合比较小的文件,通常文件大小在1.5-2G之间。
File file = new File("test.txt");
final RandomAccessFile rw = new RandomAccessFile(file,"rw");
final MappedByteBuffer map = rw.getChannel().map(FileChannel.MapMode.READ_WRITE,0,2048);
map.put("mmap content".getBytes());
rw.close();
kafka使用transFile方式将硬盘数据加载到网卡。
六、使用MQ如何保证分布式事物的最终一致性?1、要保证生产者100%消息投递。事物消息机制。
2、消费者保证幂等消费。唯一ID+业务自己实现。
分布式MQ的三种语义:
at least one:每个算子至少被处理一遍
at most one:每个算子至多被处理一遍
exactly one:每个算子只被处理一遍
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
新闻名称:RocketMQ笔记:-创新互联
URL链接:http://scyanting.com/article/cdsihi.html