eclipse开发spark的详细过程

本篇内容主要讲解“eclipse开发spark的详细过程”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“eclipse开发spark的详细过程”吧!

成都创新互联公司专注于临桂网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供临桂营销型网站建设,临桂网站制作、临桂网页设计、临桂网站官网定制、微信小程序开发服务,打造临桂网络公司原创品牌,更为您提供临桂网站排名全网营销落地服务。

一、搭建环境

eclispe安装scala-ide插件

二、读取es和MySQL

首先添加pom:


	4.0.0
	test
	test
	0.0.1-SNAPSHOT
	spark

	
		2.11.8
		2.2.0
		2.11

		18.0
	

	
		
			junit
			junit
			3.8.1
			test
		

		
			org.apache.spark
			spark-core_${spark.artifactId.version}
			${spark.version}
		

		
			org.apache.spark
			spark-sql_${spark.artifactId.version}
			${spark.version}
		

		
			org.scala-lang
			scala-compiler
			${scala.version}
			
			provided
		

		
			com.alibaba
			fastjson
			1.2.29
		

		
			org.elasticsearch
			elasticsearch-spark-20_${spark.artifactId.version}
			6.2.0
			compile
			
				
					log4j-over-slf4j
					org.slf4j
				
			
		

		
			mysql
			mysql-connector-java
			5.1.6
		

		
			org.scala-lang
			scala-library
			${scala.version}
		

		
		
			org.slf4j
			slf4j-api
			1.6.4
		
		
			org.slf4j
			slf4j-log4j12
			1.7.25
		
	

	
		
			
				org.apache.maven.plugins
				maven-compiler-plugin
				3.6.1
				
					1.8
					1.8
				
			
			
				net.alchim31.maven
				scala-maven-plugin
				3.2.2
			
			
				org.apache.maven.plugins
				maven-jar-plugin
				3.0.2
				
					
						
							true
							lib/
							spark.example.Main
						
					
				
			
			
				org.apache.maven.plugins
				maven-dependency-plugin
				3.0.0
				
					
						package
						
							copy-dependencies
						
						
							${project.build.directory}/lib
						
					
				
			
		
	

然后写主函数:

package test

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import java.util.Properties

object querySql {
  def main(args: Array[String]): Unit = {
    //  读取mysql数据:

    val spark = SparkSession.builder().appName("Java Spark MYSQL basic example")
      .master("local")
      .config("es.nodes", "127.0.0.1")
      .config("es.port", "9200")
      .config("es.mapping.date.rich", "false") //不解析日期类型
      .getOrCreate()

    val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"
    val table = "sys_user";
    val props = new Properties()
    props.setProperty("dbtable", table) // 设置表
    props.setProperty("user", "root") // 设置用户名
    props.setProperty("password", "123456") // 设置密码

    //    val df = spark.read.jdbc(url, table, props)
    //    df.show()

    //添加筛选条件
    //   val filter = df.filter(col("TABLE_ID").gt("10"));
    //    System.out.println("mysql count:" + filter.count());

    val esRows = spark.read.format("org.elasticsearch.spark.sql").load("visitlog/_doc")
    //      esRows.show()

    esRows.createOrReplaceGlobalTempView("table1");

    //       val subDf = spark.sql("SELECT userId,ip,createTime,createTime2 FROM global_temp.table1")
    val subDf = spark.sql("SELECT userId,count(userId) FROM global_temp.table1 group by userId")

    subDf.show();

    spark.close();
  }
}

三、打包执行

打包命令:mvn clean scala:compile package

执行命令:java -Djava.ext.dirs=lib -cp test-0.0.1-SNAPSHOT.jar  test.querySql

到此,相信大家对“eclipse开发spark的详细过程”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


分享名称:eclipse开发spark的详细过程
URL链接:http://scyanting.com/article/pciohd.html