kafkahigh-levelconsumer多线程访-创新互联

在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

专注于为中小企业提供成都网站建设、成都网站制作服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业湾里免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上千企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

Java代码  kafka high-level consumer 多线程访

  1.  def hasNext(): Boolean = {

  2.     if(state == FAILED)         //处于FAILED状态时,另外线程访问会直接异常

  3.       throw new IllegalStateException("Iterator is in failed state")

  4.     state match {

  5.       case DONE => false

  6.       case READY => true

  7.       case _ => maybeComputeNext()

  8.     }

  9.   }

  10.   def maybeComputeNext(): Boolean = {

  11.     state = FAILED              //重置了状态

  12.     nextItem = Some(makeNext())

  13.     if(state == DONE) {

  14.       false

  15.     } else {

  16.       state = READY

  17.       true

  18.     }

  19.   }

  20.   下载

  21. protected def makeNext(): MessageAndMetadata[K, V] = {

  22.     var currentDataChunk: FetchedDataChunk = null

  23.     // if we don't have an iterator, get one

  24.     var localCurrent = current.get()

  25.     if(localCurrent == null || !localCurrent.hasNext) {

  26.       if (consumerTimeoutMs < 0)

  27.         currentDataChunk = channel.take             //channel是BlockingQueue这里会阻塞

  28.       else {

  29.         currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)

  30.         if (currentDataChunk == null) {

  31.           // reset state to make the iterator re-iterable

  32.           resetState()

  33.           throw new ConsumerTimeoutException

  34.         }

  35.       }

  36. //省略部分代码

  37. }

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


本文标题:kafkahigh-levelconsumer多线程访-创新互联
文章来源:http://scyanting.com/article/shddj.html