flink例子-读取数据库
private final static Logger logger = LoggerFactory.getLogger(GetData.class);
public static void main(String[] arg) throws Exception {
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.MySQL.jdbc.Driver")
.setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8")
.setUsername("*")
.setPassword("*")
.setQuery("select name from words")
.setRowTypeInfo(rowTypeInfo)
.finish();
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource s = env.createInput(jdbcInputFormat); // datasource
BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());
tableEnv.registerDataSet("t2", s);
tableEnv.sqlQuery("select * from t2").printSchema();
Table query = tableEnv.sqlQuery("select * from t2");
DataSet result = tableEnv.toDataSet(query, Row.class);
result.print();
System.out.println(s.count());
}
通过插件将所需的类打到一个jar中
10年积累的成都做网站、网站建设经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站制作后付款的网站建设流程,更有黄岛免费网站建设让你可以放心的选择与我们合作。
maven-assembly-plugin
false
jar-with-dependencies
*
make-assembly
package
assembly
然后执行
./bin/flink run /flink-1.8.0/collector-api-0.1.jar
分享文章:flink例子-读取数据库
地址分享:http://scyanting.com/article/gecghi.html