Overview | Apache Flink
FlinkSQL开发步骤
Concepts & Common API | Apache Flink
添加依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.13.6</version><scope>provided</scope>
</dependency>
DataStream -> 表
第一种方案:
第二种方案:
表->DataStream
查询
Table风格/DSL风格
SQL风格
进行一个简单的Demo演示:
package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class Demo06FlinkSQL {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. source-加载数据DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8899);// 需要将DataStream变为 TabletEnv.createTemporaryView("table1",streamSource);//3. transformation-数据处理转换Table table = tEnv.sqlQuery("select * from table1");//table.printSchema(); 这个是打印表结构的,不是打印表数据的//4. 要想打印表中的数据是没办法的,需要将table转换为DataStreamDataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);appendStream.print();//5. execute-执行env.execute();}
}
因为DataStream中是Row 类型,所以打印的格式是Row 这个类中的toString方法决定的。这个地方的 +I 的意思是新增的数据。
因为我们经常编写flinksql,所以创建一个模板:
#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class ${NAME} {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. 创建表对象//3. 编写sql语句//4. 将Table变为stream流//5. execute-执行env.execute();}
}
根据这个可以做一个单词统计的案例:
第一版代码:
package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/**** 输入的数据 hello,5* hello,10 结果是 hello,15**/
public class _01FlinkSQLWordCount {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);// 将数据进行转换DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");String word = arr[0];int num = Integer.valueOf(arr[1]);return Tuple2.of(word, num);}});// 一个单词是一列 默认是 f0tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));//3. 进行sql 查询Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");//4. sink-数据输出DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);resultStream.print();//5. execute-执行env.execute();}
}
报错:
Exception in thread "main" org.apache.flink.table.api.TableException: toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:395)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:186)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:354)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:343)at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.immutable.Range.foreach(Range.scala:160)at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)at scala.collection.AbstractTraversable.map(Traversable.scala:104)
解决方案:
//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);
修改为:
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);
第二版代码:
package com.bigdata.day08;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/*** 输入的数据 hello,5* hello,10 结果是 hello,15**/
public class _01FlinkSQLWordCount {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. source-加载数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 6666);// 将数据进行转换DataStream<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] arr = s.split(",");String word = arr[0];int num = Integer.valueOf(arr[1]);return Tuple2.of(word, num);}});// 一个单词是一列 默认是 f0tEnv.createTemporaryView("table1",mapStream,$("word"),$("num"));//3. 进行sql 查询Table tableResult = tEnv.sqlQuery("select word,sum(num) as sumNum from table1 group by word");//4. sink-数据输出// 这个toAppendStream ,是新增的操作,不能用于分组//DataStream<Row> resultStream = tEnv.toAppendStream(tableResult, Row.class);// toRetractStream 使用这个带有缩进功能的方法,可以运行group by 等sqlDataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(tableResult, Row.class);retractStream.print();//5. execute-执行env.execute();}
}
toAppendStream: 适用于生成新的计算结果,并不会对老的计算结果进行修改。使用这个SQL语句中是不能出现分组的。
toRetractStream : 适用于对已经计算的结果进行更新,如果是true 代表新增,更新 false 代表遗弃
+ I 表示新增
-U 更新前
+U 更新后
FlinkSQL-API :
需求: 使用SQL和Table(DSL)两种方式对DataStream中的单词进行统计
package com.bigdata.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class WC{private String word;private int num;
}
package com.bigdata.sql;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;public class _02_WordCountDemo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//2. 创建表对象DataStreamSource<WC> dataStreamSource = env.fromElements(new WC("hello",5),new WC("hello",10),new WC("world",1),new WC("world",2));// 第一种:使用纯sql的写法tEnv.createTemporaryView("wordcount",dataStreamSource,$("word"),$("num"));//3. 编写sql语句Table table = tEnv.sqlQuery("select word,sum(num) wdcount from wordcount group by word ");//4. 将Table变为stream流// toAppendStream 不支持分组聚合操作//DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);//retractStream.print();// 以上这些是SQL的写法,它还支持DSL的写法Table table2 = tEnv.fromDataStream(dataStreamSource,$("word"),$("num"));//Table resultTable = table2.groupBy($("word")).select($("word"), $("num").sum().as("wdcount"));Table resultTable = table2.groupBy($("word")).select($("word"), $("num").sum()).as("word","num");resultTable.printSchema();DataStream<Tuple2<Boolean, Row>> resultStream= tEnv.toRetractStream(resultTable, Row.class);resultStream.print();resultStream.map(new MapFunction<Tuple2<Boolean, Row>, String>() {@Overridepublic String map(Tuple2<Boolean, Row> value) throws Exception {Row row = value.f1;return row.getField("word")+","+row.getField("num");}}).print();DataStream<Tuple2<Boolean, WC>> wcStream= tEnv.toRetractStream(resultTable, WC.class);wcStream.print();//5. execute-执行env.execute();}
}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:473)假如遇到以上错误,请检查wc实体类中,是否有无参构造方法。