SparkStreaming的实现和使用方法-创新互联
这篇文章主要讲解了“SparkStreaming的实现和使用方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SparkStreaming的实现和使用方法”吧!
堆龙德庆ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!一.DStream 整合RDD
1.官网算子
2.使用案例
生产中使用多的是一个文件中有很多域名,另一个中是黑名单,要进行剔除 数据一:日志信息 DStream domain,traffic xinlang.com xinlang.com baidu.com 数据二:已有的文件 黑名单 RDD domain baidu.com
3.RDD实现上述需求
package sparkstreaming02 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]") val sc = new SparkContext(conf) val input1 = new ListBuffer[(String,Long)] input1.append(("www.xinlang.com", 8888)) input1.append(("www.xinalng.com", 9999)) input1.append(("www.baidu.com", 7777)) val data1 = sc.parallelize(input1) //进行join一定要是key,value形式的 val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = sc.parallelize(input2) data1.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true }).map(x => (x._1,x._2._1)) .collect().foreach(println) } }
4.SparkStreaming实现
package sparkstreaming02 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ListBuffer object Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(10)) val lines = ssc.socketTextStream("s201",9999) // 数据二: rdd val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = ssc.sparkContext.parallelize(input2) lines.map(x=>(x.split(",")(0), x)).transform( rdd => { rdd.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true //注意 join之后过滤 }).map(x => (x._1,x._2._1)) } ).print() ssc.start() ssc.awaitTermination() } }
二.SparkStreaming插入外部数据源
1.插入外部数据源用的,但是使用这个有几个坑
、
2.错误一官网例子
3.原因
connect 在Driver端创建,record在executor,发过去序列化错误
4.解决
解决:第一种把connect放到executor端 这样弊端是每条记录会生成一个connect太耗费资源 words.foreachRDD { rdd => rdd.foreach { record => val connection = createConnection() // executed at the driver val word = record._1 val count = record._2.toInt val sql = s"insert into wc (wc,count) values($word,$count)" connection.createStatement().execute(sql) }
5.最终解决办法
package sparkstreaming02 import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object MysqlStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("s201",9999) val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_) // words.foreachRDD { rdd => // val connection = createConnection() // executed at the driver // rdd.foreach { record => // val word = record._1 // val count = record._2 // val sql = s"insert into wc (word,count) values($word,$count)" // connection.createStatement().execute(sql) // } // } // words.foreachRDD { rdd => // rdd.foreach { record => // val connection = createConnection() // executed at the driver // val word = record._1 // val count = record._2.toInt // val sql = s"insert into wc (wc,count) values($word,$count)" // connection.createStatement().execute(sql) // } // } //最终的写法 words.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createConnection() partitionOfRecords.foreach( record =>{ val word = record._1 val count = record._2 val sql = s"insert into wc (wc,count) values('$word',$count)" connection.createStatement().execute(sql)} ) } } ssc.start() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false","root","123456") } }
6.出现问题
错误,插入数据库时,你要插入字符串要用'' 例如: val sql = s"insert into wc (wc,count) values($word,$count)" word是字符串,你要不加双引号就报这个错误 正确 val sql = s"insert into wc (wc,count) values('$word',$count)"
感谢各位的阅读,以上就是“SparkStreaming的实现和使用方法”的内容了,经过本文的学习后,相信大家对SparkStreaming的实现和使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
网站标题:SparkStreaming的实现和使用方法-创新互联
路径分享:http://scyanting.com/article/ccjese.html