1、如何用flink的table和sql​构建pom文件

这篇文章主要讲解了“1、如何用flink的table和sql构建pom文件”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“1、如何用flink的table和sql构建pom文件”吧!

成都创新互联公司服务项目包括江干网站建设、江干网站制作、江干网页制作以及江干网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,江干网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到江干省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!

构建pom文件



    4.0.0

    org.example
    flinksqldemo
    1.0-SNAPSHOT


    
        
        UTF-8
        UTF-8

        2.11
        2.11.8
        0.10.2.1
        1.12.0
        2.7.3

        
        compile
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    



    
        
        
            org.apache.flink
            flink-table-planner-blink_2.11
            1.12.0

        

        
            org.apache.flink
            flink-java
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-clients_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-connector-kafka-0.10_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-connector-filesystem_${scala.binary.version}
            ${flink.version}
        
        
        

        
        
            org.apache.kafka
            kafka_${scala.binary.version}
            ${kafka.version}
            ${setting.scope}
        
        

        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${setting.scope}
        
        

        
            org.slf4j
            slf4j-api
            1.7.25
        
        
            com.alibaba
            fastjson
            1.2.72
        
        
            redis.clients
            jedis
            2.7.3
        
        
            com.google.guava
            guava
            29.0-jre
        

    

2、编写代码

package com.jd.data;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");
//        DataStreamSource stream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator map = stream.map(new MapFunction() {

            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], split[1], split[2]);
            }
        });



        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        使用 table api
//        Table table = tableEnv.fromDataStream(map);
//        table.printSchema();
//        Table select = table.select("a,b");

//        使用 sql api
        tableEnv.createTemporaryView("test", map);
        Table select = tableEnv.sqlQuery(" select a, b from test");


        DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
        sensorReading2DataStream.map(new MapFunction() {
            @Override
            public Object map(SensorReading2 value) throws Exception {
                System.out.println(value.a+"   "+ value.b);
                return null;
            }
        });
        env.execute();


    }
}
package com.jd.data;

public class SensorReading {
    public String a;
    public String b;
    public String c;

    public SensorReading(){

    }

    public SensorReading(String a, String b, String c) {
        this.a = a;
        this.b = b;
        this.c = c;
    }

    public String getA() {
        return a;
    }

    public void setA(String a) {
        this.a = a;
    }

    public String getB() {
        return b;
    }

    public void setB(String b) {
        this.b = b;
    }

    public String getC() {
        return c;
    }

    public void setC(String c) {
        this.c = c;
    }
}
package com.jd.data;

public class SensorReading2 {
    public String a;
    public String b;

    public SensorReading2(){

    }

    public SensorReading2(String a, String b) {
        this.a = a;
        this.b = b;
    }

    public String getA() {
        return a;
    }

    public void setA(String a) {
        this.a = a;
    }

    public String getB() {
        return b;
    }

    public void setB(String b) {
        this.b = b;
    }


}

注意:pojo 中属性必须是public的, 包含无参构造器

感谢各位的阅读,以上就是“1、如何用flink的table和sql构建pom文件”的内容了,经过本文的学习后,相信大家对1、如何用flink的table和sql构建pom文件这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!


当前名称:1、如何用flink的table和sql​构建pom文件
文章链接:http://scyanting.com/article/iegiei.html