欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 高考 > flink left join消费kafka数据

flink left join消费kafka数据

2025/11/9 22:15:17 来源:https://blog.csdn.net/m0_37759590/article/details/139338770  浏览:    关键词:flink left join消费kafka数据

left join会产生回车流数据

在控制台数据


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author: YSKSolution* @Date: 2022/11/8/19:16* @Package_name: PACKAGE_NAME*/
public class LeftJoin extends BaseSQLAPP {public static void main(String[] args) {new LeftJoin().init(2003,2,"BaseSQLAPP");}@Overrideprotected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {//join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));tEnv.executeSql("create table t1 (" +" id int, "+" name string "+")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv"));tEnv.executeSql("create table t2 (" +" id int, "+" age int "+")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv"));Table table = tEnv.sqlQuery(" select " +"t1.id," +"t1.name," +"t2.age" +" from t1 " +" left join t2 " +" on t1.id = t2.id ");//        tEnv.createTemporaryView("result",table);table.execute().print();}
}

先输入t1数据
在这里插入图片描述
控制台数据 ,左表数据输出,右表数据为null
在这里插入图片描述
再输入右表数据
在这里插入图片描述
控制台产生两条数据,一条是回撤流,一条是join得到的数据
在这里插入图片描述
2.写入upsertkakfa消费


import com.sjfood.sjfood.gmallrealtime.app.BaseSQLAPP;
import com.sjfood.sjfood.gmallrealtime.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author: YSKSolution* @Date: 2022/11/8/19:16* @Package_name: PACKAGE_NAME*/
public class LeftJoin extends BaseSQLAPP {public static void main(String[] args) {new LeftJoin().init(2003,2,"BaseSQLAPP");}@Overrideprotected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {//join的时候,这种数据在状态中保存的时间
//        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(20));tEnv.executeSql("create table t1 (" +" id int, "+" name string "+")"+ SQLUtil.getKafkaSourceDDL("t1","t1","csv"));tEnv.executeSql("create table t2 (" +" id int, "+" age int "+")"+ SQLUtil.getKafkaSourceDDL("t2","t2","csv"));Table table = tEnv.sqlQuery(" select " +"t1.id," +"t1.name," +"t2.age" +" from t1 " +" left join t2 " +" on t1.id = t2.id ");//        tEnv.createTemporaryView("result",table);tEnv.executeSql("create table t3(" +"id int," +"name string," +"age int," +"primary key (id) not enforced"+")"+SQLUtil.getUpsertKafkaDDL("t3","json"));table.executeInsert("t3");}
}

先写左表,消费到的数据如下,右表数据为null
在这里插入图片描述
再写右表,产生两条数据,第一条是null,表示删除上面那条数据,第二条是left join得到的结果
在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词