flinksql怎么实时计算当天pv写入mysql

这篇文章主要讲解了“flink sql怎么实时计算当天pv写入MySQL”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink sql怎么实时计算当天pv写入mysql”吧!

创新互联公司是一家专注于网站制作、成都网站建设与策划设计,茫崖网站建设哪家好?创新互联公司做网站,专注于网站建设十多年,网设计领域的专业建站公司;建站业务涵盖:茫崖等地区。茫崖做网站价格咨询:13518219792

首先我们还是使用datagen生成测试数据,随机生成一些用户id

     String sourceSql = "CREATE TABLE datagen (\n" +
               " userid int,\n" +
               " proctime as PROCTIME()\n" +
               ") WITH (\n" +
               " 'connector' = 'datagen',\n" +
               " 'rows-per-second'='100',\n" +
               " 'fields.userid.kind'='random',\n" +
               " 'fields.userid.min'='1',\n" +
               " 'fields.userid.max'='100'\n" +
               ")";

定义mysql的sink,这里mysql是作为了一个upsert的sink,所以必须要一个主键,在mysql建表的时候我们指定了当天的日期作为主键,mysql ddl如下

CREATE TABLE `pv` (
 `day_str` varchar(100) NOT NULL,
 `pv` bigint(10) DEFAULT NULL,
 PRIMARY KEY (`day_str`)
)

Flink中的ddl要和mysql中对的上,也要指定主键。

    String mysqlsql = "CREATE TABLE pv (\n" +
               "  day_str STRING,\n" +
               "  pv bigINT,\n" +
               "  PRIMARY KEY (day_str) NOT ENFORCED\n" +
               ") WITH (\n" +
               "   'connector' = 'jdbc',\n" +
               "   'username' = 'root',\n" +
               "   'password' = 'root',\n" +
               "   'url' = 'jdbc:mysql://localhost:3306/test',\n" +
               "   'table-name' = 'pv'\n" +
               ")";

接下来我们写一个简单的查询:

     tEnv.executeSql("insert into pv SELECT DATE_FORMAT(proctime, 'yyyy-MM-dd') as day_str, count(*) \n" +
               "FROM datagen \n" +
               "GROUP BY DATE_FORMAT(proctime, 'yyyy-MM-dd')");

可能对于以前一直做批处理的同学来说会感到疑惑,对于流式处理来说,group by将会返回一个可撤回流(RetractStream),转化成datastream,将会得到一个Tuple2对象,这个对象第一个字段如果是false表示数据要撤回,true表示数据是我们新添加的,第二个字段是实际的数据。在这里,我们将这个实时更新的结果写入到了mysql。这样mysql表,每天就会只有一个数据,系统会不断地更新pv字段。

flink sql怎么实时计算当天pv写入mysql

类似的需求我们还可以使用flink的窗口来实现,定义一个窗口周期是一天的窗口,然后自定义一个触发器,比如每秒钟触发一次,然后将结果输出写入第三方sink。

感谢各位的阅读,以上就是“flink sql怎么实时计算当天pv写入mysql”的内容了,经过本文的学习后,相信大家对flink sql怎么实时计算当天pv写入mysql这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


本文名称:flinksql怎么实时计算当天pv写入mysql
文章位置:http://scyanting.com/article/gopcsi.html