欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > [实战-12] flinkSql 时间属性

[实战-12] flinkSql 时间属性

2025/5/11 17:53:00 来源:https://blog.csdn.net/qq_36066039/article/details/143390026  浏览:    关键词:[实战-12] flinkSql 时间属性

flinkSql 时间属性

  • 一. 摘要
  • 二.flink 时间数据类型
      • 2.1. TIMESTAMP(n)
      • 2.2. TIMESTAMP_LTZ(n)
  • 三.flink 受时区影响的时间函数
  • 五. time Attributes
    • 5.1. event_time
    • 5.2. process_time
  • 六. event_time 时间字段转化
  • 六. process_time 时间字段转化
  • 七.参考链接
    • flink-doc-time-attr [time-zone](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/timezone/)

一. 摘要

在阅读下面知识之前,读者需要先看 Flinksql设置时区

二.flink 时间数据类型

flink兼容很多上游数据库,一般要求数据库中的字段类型为datetime或者是timestamp的时候,flinksql create table的时候可以用TIMESTAMP()或者TIMESTAMP_ltz()接收
如果上游数据是string或者是long则需要转化,这块后面再说。

2.1. TIMESTAMP(n)

是 TIMESTAMP(n) WITHOUT TIME ZONE 的简写,不携带时区信息。意思是你再源数据库中看到的yyyy-MM-dd
hh:MM:ss是什么样子, 在flink中看到的就是什么样子。换句话说
TIMESTAMP(n)相当于一个字符串类型,无论作业时区怎么变,得到的字符串是不变的。

注意:TIMETSAMP(n) 在flink中是 字符串,字符串,字符串,重要的事情说三遍

2.2. TIMESTAMP_LTZ(n)

是TIMESTAMP§ WITH LOCAL TIME ZONE的简写, 是一个全球统一的时间点类型,
它底层不是字符串,属于Bigint类型。如果将其转为字符串,则结果会随着作业时区改变。
flinksql查看执行结果的时候,不管是TIMESTAMP()还是TIMESTAMP_LTZ我们看到的都是字符串.
TIMESTAMP_LTZ展示给我们的 字符串 会随着时区的table.local-time-zone 的设置而变化。这点要注意哦。
而TIMESTAMP(n)底层本身就是 字符串 , 是和时区没有关系的。 这就是两个的区别。

注意:TIMETSAMP(n) 在flink中是 long,long,long,重要的事情说三遍

三.flink 受时区影响的时间函数

函数例子
LOCALTIME15:18:36
LOCALTIMESTAMP2021-04-15 15:18:36.384
CURRENT_DATE2021-04-15
CURRENT_TIME15:18:36.384
CURRENT_TIMESTAMP2021-04-15 15:18:36.384
CURRENT_ROW_TIMESTAMP()2021-04-15 15:18:36.384
NOW()2021-04-15 15:18:36.384
PROCTIME()2021-04-15 15:18:36.384
  • TIMESTAMP(n) 对flink来说是string不受到时区的影响
    不管如何设置时区,我们看到的日期字符串都是不变的
  • TIMESTAMP_LTZ(n) 对flink来说是long,受到时区影响
    对我们来说看到的都是yyyy-MM-dd hh:MM:ss格式的字符串,
    long->可见的字符串这个过程 是受到时区影响的 因此时区不同我们看到的字符串也是不同的。

五. time Attributes

我们知道,flink常见的事件事件和处理时间, 二者都可以用来开窗口计算。
event_time和process_time都是时间类型,被称为time Attributes(时间属性)

5.1. event_time

可用必须是timestamp()或者是timestamp_ltz 的类型

5.2. process_time

flink-1.13之前PROCTIME()  用的是TIMESTAMP(3)字符串类型,且这个字符串是UTC时区转化的字符串
flink-1.13只后PROCTIME() 用的是TIMESTAMP_LTZ() 会自动按照当前sessoin的时区转换。

对flink来说外界数据时间,需要转化为fink识别的TIMESTAMP或者TIMESTAMP_LTZ(),才可以进行flink后续的窗口计算。窗口以及水位线的声明需要依赖转化后的 时间字段

六. event_time 时间字段转化

  • DDL 指定水位线和进行时间字段的转化

    	CREATE TABLE user_actions (time_str STRING,t1 as TO_TIMESTAMP(time_str,'yyyy-MM-dd HH:mm:ss'),t2 AS TO_TIMESTAMP_LTZ(time_long,3),time_datetime TIMESTAMP(3),//WATERMARK FOR t1 AS t1 - INTERVAL '5' SECOND,// WATERMARK FOR t2 AS t2 - INTERVAL '5' SECOND,//WATERMARK FOR time_datetime  AS time_datetime - INTERVAL '5' SECOND,) WITH (...);

    主要分为几种情况

    1. 数据库中是yyyy-MM-dd hh:MM:dd 的string
      需要借助函数TO_TIMESTAMP(字符串字段,'yyyy-MM-dd hh:MM:ss)
    2. 数据库是datetime或者timestamp
      可直接被识别为TIMESTAMP(n)不需要转化
    3. 数据库中是long 时间戳
      需要借助函数需要借助函数TO_TIMESTAMP_LTZ(long字段,精度)
      ts AS TO_TIMESTAMP_LTZ(time_long,3) 意思是将time_long 按照flink中设置的时区转化为 flink可识别的TIMESTAMP_LTZ(3)
      并将转化后的值 命名为ts字段

    注意上面sql中被注释的部分,意思是转化后的时间字段,就可以被用于水位线设置了

  • fromDataStream 或fromChangeStream()的时候
    这种情况必须借助 Scheme 定义水位线和时间属性

  1. 如果DatasStream 提前定义了事件事件和水位线,则Scheme可直接提取
    DataStream Api中有个隐藏是rowtime 属性,该属性其实就是event_time, 我们可以通过以下方式直接获取
           Schema.newBuilder().columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") // 获取元数据中的rowtime属性,其实就是event_time.watermark("rowtime", "SOURCE_WATERMARK()")  // 意思是获取DataStream的水位线传播到Table Api.build()
  1. 如果DatasStream 没有定义事件事件和水位线,此时需要在转换的时候重新定义水位线
    假设DataStream中数据有个字段叫做my_time是个long类型的时间戳,则通过以下方式定义新的水位线
        Schema s1=Schema.newBuilder().columnByExpression("myrowtime", "CAST(my_time AS TIMESTAMP_LTZ(0))")// .columnByExpression("myrowtime", "TO_TIMESTAMP(my_time)")//  .columnByExpression("myrowtime", "TO_TIMESTAMP_LTZ(my_time,3)") .watermark("myrowtime", "myrowtime- INTERVAL '10' SECOND").build();string类型也可以用TO_TIMESTAMP(my_time) 转换long类型也可以用TO_TIMESTAMP_LTZ(my_time,3)转换CAST那种写法比较暴力,直接是类型转换

六. process_time 时间字段转化

比较简单,直接用 PROCTIME() 函数就行了

  • DDL

    	CREATE TABLE user_actions (process_time AS PROCTIME() ) WITH (...);```
  • fromDataStream

      Schema schema = Schema.newBuilder().columnByExpression("proctime", "PROCTIME()").build();
    

七.参考链接


flink-doc-time-attr
time-zone

热搜词