9.sparkcore之共享变量-创新互联
简介
spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
10年积累的成都网站建设、成都做网站经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计后付款的网站建设流程,更有龙潭免费网站建设让你可以放心的选择与我们合作。- 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
- 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
- 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
- 累加器用于在驱动器中对数据结果进行聚合。
广播变量
原理
- 广播变量只能在Driver端定义,不能在Executor端定义。
- 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
- 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
用法
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
- 通过value属性访问该对象的值
- 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
实例
查询每个国家的呼号个数
python
# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
val countryContactCounts = contactCounts.map{case (sign, count) => {
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}}.reduceByKey((x, y) => x+y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 将呼号前缀(国家代码)作为广播变量
final Broadcast signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD countryContactCounts = contactCounts.mapToPair(new PairFunction, String, Integer>() {
public Tuple2 call(Tuple2 callSignCount) {
String sign = callSignCount._1();
String country = lookupCountry(sign, signPrefixes.value());
return new Tuple2(country, callSignCount._2());
}
}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加器
原理
- 累加器在Driver端定义赋初始值。
- 累加器只能在Driver端读取最后的值,在Excutor端更新。
用法
- 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
- 驱动器程序可以调用累加器的value属性来访问累加器的值
实例
累加空行
python
file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # 访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 //累加器加1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
java
JavaRDD rdd = sc.textFile(args[1]);
final Accumulator blankLines = sc.accumulator(0);
JavaRDD callSigns = rdd.flatMap(new FlatMapFunction() {
public Iterable call(String line) {
if ("".equals(line)) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}
});
callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
文章标题:9.sparkcore之共享变量-创新互联
链接分享:http://scyanting.com/article/ecpis.html