欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 培训 > Flink SQL

Flink SQL

2025/10/14 4:59:01 来源:https://blog.csdn.net/weixin_63297999/article/details/144190276  浏览:    关键词:Flink SQL

Overview | Apache Flink

FlinkSQL开发步骤

Concepts & Common API | Apache Flink

添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.13.6</version><scope>provided</scope>
</dependency>

DataStream -> 表

第一种方案:

第二种方案:

表->DataStream

查询

Table风格/DSL风格

SQL风格

进行一个简单的Demo演示:

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Demo06FlinkSQL {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8899);// 需要将DataStream变为 TabletEnv.createTemporaryView("table1",streamSource);//3. transformation-数据处理转换Table table = tEnv.sqlQuery("select * from table1");//table.printSchema();  这个是打印表结构的,不是打印表数据的//4. 要想打印表中的数据是没办法的,需要将table转换为DataStreamDataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);appendStream.print();//5. execute-执行env.execute();}
}

因为DataStream中是Row 类型,所以打印的格式是Row 这个类中的toString方法决定的。这个地方的 +I 的意思是新增的数据。

因为我们经常编写flinksql,所以创建一个模板:


#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;                          public class ${NAME} {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. 创建表对象//3. 编写sql语句//4. 将Table变为stream流//5. execute-执行env.execute();}
}

根据这个可以做一个单词统计的案例:

第一版代码:

package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/****     输入的数据  hello,5*               hello,10        结果是 hello,15**/
public class _01FlinkSQLWordCount {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);// 将数据进行转换DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");String word = arr[0];int num = Integer.valueOf(arr[1]);return Tuple2.of(word, num);}});// 一个单词是一列    默认是 f0tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));//3. 进行sql 查询Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");//4. sink-数据输出DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);resultStream.print();//5. execute-执行env.execute();}
}

 报错:

Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:395)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:186)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:354)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:343)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.immutable.Range.foreach(Range.scala:160)at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 

解决方案:

//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);

修改为:

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);

第二版代码:

package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/***     输入的数据  hello,5*               hello,10        结果是 hello,15**/
public class _01FlinkSQLWordCount {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);// 将数据进行转换DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");String word = arr[0];int num = Integer.valueOf(arr[1]);return Tuple2.of(word, num);}});// 一个单词是一列    默认是 f0tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));//3. 进行sql 查询Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");//4. sink-数据输出// 这个toAppendStream ,是新增的操作,不能用于分组//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);// toRetractStream  使用这个带有缩进功能的方法,可以运行group by 等sqlDataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);retractStream.print();//5. execute-执行env.execute();}
}

toAppendStream: 适用于生成新的计算结果,并不会对老的计算结果进行修改。使用这个SQL语句中是不能出现分组的。

toRetractStream : 适用于对已经计算的结果进行更新,如果是true 代表新增,更新 false 代表遗弃

+ I 表示新增

-U 更新前

+U 更新后

FlinkSQL-API

需求: 使用SQL和Table(DSL)两种方式对DataStream中的单词进行统计

package com.bigdata.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class WC{private String word;private int num;
}
package com.bigdata.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;public class _02_WordCountDemo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. 创建表对象DataStreamSource<WC> dataStreamSource = env.fromElements(new WC("hello",5),new WC("hello",10),new WC("world",1),new WC("world",2));// 第一种:使用纯sql的写法tEnv.createTemporaryView("wordcount",dataStreamSource,$("word"),$("num"));//3. 编写sql语句Table table = tEnv.sqlQuery("select word,sum(num) wdcount from wordcount group by word ");//4. 将Table变为stream流// toAppendStream 不支持分组聚合操作//DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);//retractStream.print();// 以上这些是SQL的写法,它还支持DSL的写法Table table2 = tEnv.fromDataStream(dataStreamSource,$("word"),$("num"));//Table resultTable = table2.groupBy($("word")).select($("word"), $("num").sum().as("wdcount"));Table resultTable = table2.groupBy($("word")).select($("word"), $("num").sum()).as("word","num");resultTable.printSchema();DataStream<Tuple2<Boolean, Row>> resultStream= tEnv.toRetractStream(resultTable, Row.class);resultStream.print();resultStream.map(new MapFunction<Tuple2<Boolean, Row>, String>() {@Overridepublic String map(Tuple2<Boolean, Row> value) throws Exception {Row row = value.f1;return row.getField("word")+","+row.getField("num");}}).print();DataStream<Tuple2<Boolean, WC>> wcStream= tEnv.toRetractStream(resultTable, WC.class);wcStream.print();//5. execute-执行env.execute();}
}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:473)假如遇到以上错误,请检查wc实体类中,是否有无参构造方法。

 

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com