DStream与RDD关系是什么

本篇内容主要讲解“DStream与RDD关系是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“DStream与RDD关系是什么”吧!

网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、成都微信小程序、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了淇县免费建站欢迎大家使用!

RDD是怎么生成的?RDD依靠什么生成?RDD生成的依据是什么?Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同?运行之后我们对RDD怎么处理?

RDD本身也是基本的对象,例如说BatchInterval为1秒,那么每一秒都会产生RDD,内存中不能完全容纳该对象。每个BatchInterval的作业执行完后,怎么对已有的RDD进行管理。

ForEachDStream不一定会触发Job的执行,和Job的执行没有关系。

Job的产生是由Spark Streaming框架造成的。

foreachRDD是Spark Streaming的后门,可以直接对RDD进行操作。

DStream就是RDD的模板,后面的DStream与前面的DStream有依赖。

val lines = jsc.socketTextStream("127.0.0.1", 9999)这里产生了SocketInputDStream。

lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).print()这里由SocketInputDStream转换为FlatMappedDStream,再转换为MappedDStream,再转换为ShuffledDStream,再转换为ForEachDStream。

对于DStream类,源码中是这样解释的。

* DStreams internally is characterized by a few basic properties:
*  - A list of other DStreams that the DStream depends on
*  - A time interval at which the DStream generates an RDD
*  - A function that is used to generate an RDD after each time interval

大致意思是:

1.DStream依赖于其他DStream。

2.每隔BatchDuration,DStream生成一个RDD

3.每隔BatchDuration,DStream内部函数会生成RDD

DStream是从后往前依赖,因为DStream代表Spark Streaming业务逻辑,RDD是从后往前依赖的,DStream是lazy级别的。DStream的依赖关系必须和RDD的依赖关系保持高度一致。

DStream类中generatedRDDs存储着不同时间对应的RDD实例。每一个DStream实例都有自己的generatedRDDs。实际运算的时候,由于是从后往前推,计算只作用于最后一个DStream。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] vargeneratedRDDs = newHashMap[Time, RDD[T]] ()

DStream与RDD关系是什么

generatedRDDs是如何获取的。DStream的getOrCompute方法,先根据时间判断HashMap中是否已存在该时间对应的RDD,如果没有则调用compute得到RDD,并放入到HashMap中。

/**
 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final defgetOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if(isTimeValid(time)) {

      valrddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }

      rddOption.foreach { casenewRDD =>
        // Register the generated RDD for caching and checkpointing
        if(storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD${newRDD.id} for time$time to$storageLevel")
        }
        if(checkpointDuration != null&& (time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD${newRDD.id} for time$time for checkpointing")
        }
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else{
      None
    }
  }
}

拿DStream的子类ReceiverInputDStream来说明compute方法,内部调用了createBlockRDD这个方法。

/**
 * Generates RDDs with blocks received by the receiver of this stream. */
override defcompute(validTime: Time): Option[RDD[T]] = {
  valblockRDD = {
    if(validTime < graph.startTime) {
      // If this is called for any time before the start time of the context,
      // then this returns an empty RDD. This may happen when recovering from a
      // driver failure without any write ahead log to recover pre-failure data.
      newBlockRDD[T](ssc.sc, Array.empty)
    } else{
      // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
      // for this batch
      valreceiverTracker = ssc.scheduler.receiverTracker
      valblockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

      // Register the input blocks information into InputInfoTracker
      valinputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

      // Create the BlockRDD
      createBlockRDD(validTime, blockInfos)
    }
  }
  Some(blockRDD)
}

createBlockRDD会返回BlockRDD,由于ReceiverInputDStream没有父依赖,所以自己生成RDD。

private[streaming] defcreateBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
  if(blockInfos.nonEmpty) {
    valblockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

    // Are WAL record handles present with all the blocks
    valareWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

    if(areWALRecordHandlesPresent) {
      // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
      valisBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
      valwalRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
      newWriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
    } else{
      // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
      // others then that is unexpected and log a warning accordingly.
      if(blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
        if(WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
          logError("Some blocks do not have Write Ahead Log information; "+
            "this is unexpected and data may not be recoverable after driver failures")
        } else{
          logWarning("Some blocks have Write Ahead Log information; this is unexpected")
        }
      }
      valvalidBlockIds = blockIds.filter { id =>
        ssc.sparkContext.env.blockManager.master.contains(id)
      }
      if(validBlockIds.size != blockIds.size) {
        logWarning("Some blocks could not be recovered as they were not found in memory. "+
          "To prevent such data loss, enabled Write Ahead Log (see programming guide "+
          "for more details.")
      }
      new BlockRDD[T](ssc.sc, validBlockIds)
    }
  } else{
    // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
    // according to the configuration
    if(WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
      newWriteAheadLogBackedBlockRDD[T](
        ssc.sparkContext, Array.empty, Array.empty, Array.empty)
    } else{
      new BlockRDD[T](ssc.sc, Array.empty)
    }
  }
}

再拿DStream的子类MappedDStream来说,这里的compute方法,是调用父RDD的getOrCompute方法获得RDD,再使用map操作。

private[streaming]
classMappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extendsDStream[U](parent.ssc) {

  override defdependencies: List[DStream[_]] = List(parent)

  override defslideDuration: Duration = parent.slideDuration

  override defcompute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

从上面两个DStream的子类,可以说明第一个DStream,即InputDStream的comput方法是自己获取数据并计算的,而其他的DStream是依赖父DStream的,调用父DStream的getOrCompute方法,然后进行计算。

以上说明了对DStream的操作最后作用于对RDD的操作。

接着看下DStream的另一个子类ForEachDStream,发现其compute方法没有任何操作,但是重写了generateJob方法。

private[streaming]
classForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extendsDStream[Unit](parent.ssc) {

  override defdependencies: List[DStream[_]] = List(parent)

  override defslideDuration: Duration = parent.slideDuration

  override defcompute(validTime: Time): Option[RDD[Unit]] = None

  override defgenerateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match{
      caseSome(rdd) =>
        valjobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(newJob(time, jobFunc))
      caseNone => None
    }
  }
}

从Job生成入手,JobGenerator的generateJobs方法,内部调用的DStreamGraph的generateJobs方法。

/** Generate jobs and perform checkpoint for the given `time`.  */
private defgenerateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    //根据特定的时间获取具体的数据
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //调用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match{
    caseSuccess(jobs) =>
      valstreamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    caseFailure(e) =>
      jobScheduler.reportError("Error generating jobs for time "+ time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

DStreamGraph的generateJobs方法调用了OutputStream的generateJob方法,OutputStream就是ForEachDStream。

defgenerateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time "+ time)
  valjobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      valjobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated "+ jobs.length + " jobs for time "+ time)
  jobs
}

到此,相信大家对“DStream与RDD关系是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


当前名称:DStream与RDD关系是什么
转载源于:http://scyanting.com/article/pdihce.html