RocketMQ中如何实现pushconsumer顺序消费

这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

黔西ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!

顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:

  • 在broker对message queue加锁,保证当前client占有该队列

  • consumer端对MessageQueue加锁,保证当前线程占有该队列

  • consumer端对ProcessQueue加锁,保证当前线程占有该队列

对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:

    // ConsumeMessageOrderlySerivce    
    public void start() {
        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                }
            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            // 通过LOCK_BATCH_MQ请求在broker批量锁定mq
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }

ConsumeMessageOrderlyService中的消费请求提交:

    // ConsumeMessageOrderlySerivce  
    public void submitConsumeRequest(
        final List msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) { 
            // 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

顺序处理了逻辑:

    // ConsumeMessageOrderlyService.ConsumeRequest
    public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            // 1.获取MessageQueue上的锁
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) { // 循环处理
                        // ...
                        // 单个ConsumeRequest最长处理时间默认60s
                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }

                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                        // 2. 从ProcessQueue顺序获取batchSize个消息
                        List msgs = this.processQueue.takeMessags(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            ConsumeMessageContext consumeMessageContext = null;
                            // ....

                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {
                                // 3. 获取ProcessQueue上的锁
                                this.processQueue.getLockConsume().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                // 4. 推给业务处理逻辑
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue);
                                hasException = true;
                            } finally {
                                this.processQueue.getLockConsume().unlock(); // 解锁
                            }

                            // ...
                            // 5. 处理消费结果
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false; // ProcessQueue为空,停止本次推送
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

在processConsumeResult中主要会执行2步操作:

  • 在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除

  • 更新OffsetStore

感谢你能够认真阅读完这篇文章,希望小编分享的“RocketMQ中如何实现push consumer顺序消费”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!


文章题目:RocketMQ中如何实现pushconsumer顺序消费
分享路径:http://scyanting.com/article/peihgi.html