flinksqlenv的定义

本篇内容介绍了“flinksql env的定义”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

创新互联公司主要从事成都网站设计、做网站、成都外贸网站建设公司、网页设计、企业做网站、公司建网站等业务。立足成都服务六安,10多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108

1、编写 pom



    4.0.0

    org.example
    flinksqldemo
    1.0-SNAPSHOT


    
        
        UTF-8
        UTF-8

        2.11
        2.11.8
        0.10.2.1
        1.12.0
        2.7.3

        
        compile
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    



    
        
        
            org.apache.flink
            flink-table-planner-blink_2.11
            1.12.0

        

        
            org.apache.flink
            flink-java
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-clients_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-connector-kafka-0.10_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-connector-filesystem_${scala.binary.version}
            ${flink.version}
        
        
        

        
        
            org.apache.kafka
            kafka_${scala.binary.version}
            ${kafka.version}
            ${setting.scope}
        
        

        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${setting.scope}
        
        

        
            org.slf4j
            slf4j-api
            1.7.25
        
        
            com.alibaba
            fastjson
            1.2.72
        
        
            redis.clients
            jedis
            2.7.3
        
        
            com.google.guava
            guava
            29.0-jre
        

    

2、编写代码

package com.jd.data;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkTableApiDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");

//        1、创建表执行环节
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//        ==============================================
//        1.1 老版本planner的流式查询
        EnvironmentSettings set = EnvironmentSettings.newInstance()
                .useOldPlanner() //用老版本
                .inStreamingMode() //流式处理
                .build();

//        老版本的流式处理执行环境
        StreamTableEnvironment oldStreamingEnv = StreamTableEnvironment.create(env, set);

//      1.2 老版本批处理环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment);

//        =========================================================

//        1.3 blink 版本的流式查询

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment blinkTableEnv = StreamTableEnvironment.create(env, settings);

//        1.4 blink 版本的批处理查询
        EnvironmentSettings bsettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment blinkBatchTableEnvironment = TableEnvironment.create(settings);

    }
}

“flinksql env的定义”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


分享文章:flinksqlenv的定义
文章起源:http://scyanting.com/article/pjeeoh.html