sparkstreaming测试之二使用网络数据源-创新互联
测试思路:
成都创新互联公司是一家专注于网站建设、网站设计与策划设计,鹤山网站建设哪家好?成都创新互联公司做网站,专注于网站建设十余年,网设计领域的专业建站公司;建站业务涵盖:鹤山等地区。鹤山做网站价格咨询:18980820575首先,创建网络数据源数据发送器(程序一);
其次,创建spark接收数据程序(程序二);
接着,将程序一打包,放在服务器上执行。这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据;
最后,运行spark程序,这里每隔5秒处理一次数据。有两个参数:监听的端口号,每隔多少毫秒接收一次数据。
观察效果。
程序一:
sparkStreaming import java.io.PrintWriter import java.net.ServerSocket import scala.io.Source object SalaSimulation { (length: ) = { java.util.Random rdm = Random rdm.nextInt(length) } (args: Array[]){ (args.length != ){ System..println() System.() } filename = args() lines = Source.(filename).getLines.toList filerow = lines.length listener = ServerSocket(args().toInt) (){ socket = listener.accept() Thread(){ = { (+socket.getInetAddress) out = PrintWriter(socket.getOutputStream()) (){ Thread.(args().toLong) content = lines((filerow)) (content) out.write(content +) out.flush() } socket.close() } }.start() } } }
程序二:
sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{SecondsStreamingContext} import org.apache.spark.{SparkContextSparkConf} import org.apache.spark.streaming.StreamingContext._ object NetworkWordCount { def main(args: Array[]){ Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) conf = SparkConf().setAppName().setMaster() sc = SparkContext(conf) ssc = StreamingContext(sc()) lines = ssc.socketTextStream(args()args().toIntStorageLevel.) words = lines.flatMap(_.split()) wordCounts = words.map(x=>(x)).reduceByKey(_+_) wordCounts.print() ssc.start() ssc.awaitTermination() } }
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
文章标题:sparkstreaming测试之二使用网络数据源-创新互联
文章分享:http://scyanting.com/article/jghgj.html