(版本定制)第12课:SparkStreaming源码解读之Executor容错安全性

本期内容:

创新互联是一家以网络技术公司,为中小企业提供网站维护、成都网站设计、做网站、网站备案、服务器租用、国际域名空间、软件开发、微信小程序开发等企业互联网相关业务,是一家有着丰富的互联网运营推广经验的科技公司,有着多年的网站建站经验,致力于帮助中小企业在互联网让打出自已的品牌和口碑,让企业在互联网上打开一个面向全国乃至全球的业务窗口:建站欢迎联系:028-86922220

    1、Executor的WAL容错机制

    2、消息重放

Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?

原因是计算的时候Spark Streaming是借助于Spark Core上RDD的安全容错的,所以天然的安全可靠的。

Executor的安全容错主要有:

    1、数据副本:

         有两种方式:a.借助底层的BlockManager,BlockManager做备份,通过传入的StorageLevel进行备份。

                              b. WAL方式进行容错。

    2、接受到数据之后,不做副本,但是数据源支持存放,所谓存放就是可以反复的读取源数据。

容错的弊端:耗时间、耗空间。

    

简单的看下源代码:

/** Store block and report it to driver */
def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
  ) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
  logDebug(s"Reported block $blockId")
}

private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
    }
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通过WAL容错
  } else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通过BlockManager进行容错
  }
}
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
      numRecords = Some(arrayBuffer.size.toLong)
      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
tellMaster = true)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
tellMaster = true)
      numRecords = countIterator.count
      putResult
case ByteBufferBlock(byteBuffer) =>
      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
  }
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
  }
BlockManagerBasedStoreResult(blockId, numRecords)
}

简单流程图:

(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

参考博客:http://blog.csdn.net/hanburgud/article/details/51471089


当前名称:(版本定制)第12课:SparkStreaming源码解读之Executor容错安全性
当前URL:http://scyanting.com/article/pijshh.html