 Flink流处理之ProcessFunction
Flink流处理之ProcessFunction
  # ProcessFunction
[toc]
# 基本概念
ProcessFunction函数是一个低级的流处理函数,可以将其看做一个具有Keyed状态和定时访问权限的FlatMapFunction,它通过调用输入数据流中收到的每个事件(元素)来处理事件(元素)。
- 转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如我们常用的MapFunction转换算子就无法访问时间戳或者当前事件的事件时间。
- 基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
- Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
# 8个ProcessFunction
- Flink提供了8个Process Function:
- **1)**ProcessFunction dataStream
- **2)**KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理
- **3)**CoProcessFunction 用于connect连接的流
- **4)**ProcessJoinFunction 用于join流操作
- **5)**BroadcastProcessFunction 用于广播
- **6)**KeyedBroadcastProcessFunction keyBy之后的广播
- **7)**ProcessWindowFunction 窗口增量聚合
- **8)**ProcessAllWindowFunction 全窗口聚合
 
# KeyedProcessFunction 使用案例
# 实现功能
通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)
# 实现代码
package com.jast.flink.processfunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
 * KeyedProcessFunction Demo
 */
object KeyedProcessFunctionDemo {
  
  def main(args: Array[String]): Unit = {
    //创建Stream环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //通过9999socket接口接收数据
    val stream: DataStream[String] = env.socketTextStream("localhost", 9999)
    //数据接收格式为:商品,价格
    //e.g. 帽子,38
    //e.g. 衣服,199
    val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1)))
      .setParallelism(4) //设置并行度为4
    typeAndData.keyBy(0) //根据key(商品)进行聚合
      .process(new MyprocessFunction()) //调用自定义KeyedProcessFunction函数
      .print("结果") // 输出函数返回结果,前面加上"结果"
    env.execute()
  }
  /**
   * 实现:
   *    根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警
   */
  class MyprocessFunction extends  KeyedProcessFunction[Tuple,(String,String),String]{
    //统计间隔时间
    val delayTime : Long = 1000 * 10
    /**
     * 状态存储变量
     * cjcount 自定义名称
     */
    lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))
    /**
     * 定时器
     * @name onTimer
     * @date 2022/4/6 上午11:13
     * @return void
     * @param timestamp 定时器出发的时间
     * @param ctx
     * @param out
     * @author Jast
    */
    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {
      printf("定时器触发,时间为:%d,状态为:%s,key为:%s\n",timestamp,state.value(),ctx.getCurrentKey)
      if(state.value()._2==0){
        //该时间段数据为0,进行预警
        printf("类型为:%s,数据为0,预警\n",state.value()._1)
      }
      //定期数据统计完成后,清零
      state.update(state.value()._1,0)
      //再次注册定时器执行
      val currentTime: Long = ctx.timerService().currentProcessingTime()
      ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
    }
    /**
     * 处理数据方法
     * @name processElement
     * @date 2022/4/6 上午11:12
     * @return void
     * @param value 数据的数据值
     * @param ctx 存储的上下文信息
     * @param out 返回数据格式
     * @author Jast
    */
    override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {
      printf("状态值:%s,state是否为空:%s\n",state.value(),(state.value()==null))
      if(state.value() == null){
        //当前Key首次执行状态值为空,进行初始化赋值
        //获取时间
        val currentTime: Long = ctx.timerService().currentProcessingTime()
        //注册定时器 delayTime 秒后触发(执行onTimer方法)
        ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
        printf("定时器注册时间:%d\n",currentTime + delayTime)
        // 更新state值
        state.update(value._1,value._2.toInt)
      } else{
        //统计数据
        val key: String = state.value()._1
        var count: Long = state.value()._2
        count += value._2.toInt
        //更新state值
        state.update((key,count))
      }
      //返回任务的名称,并附加子任务指示符,例如“MyTask(3/6)”,
      println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)
      printf("状态值:%s\n",state.value())
      //返回处理后结果
      out.collect("处理后返回数据->"+value)
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# 测试执行
(1) 在终端启动端口准备发送数据使用
mac@Mac ~ % nc -lk 9999
1
**(2)**启动程序
**(3)**发送数据
mac@Mac ~ % nc -lk 9999
衣服,123
裤子,1234
衣服,1
1
2
3
4
2
3
4
**(4)**控制台输出内容
状态值:null,state是否为空:true
定时器注册时间:1649213908697
KeyedProcess -> Sink: Print to Std. Out (8/8)->(衣服,123)
状态值:(衣服,123)
结果:8> 处理后返回数据->(衣服,123)
状态值:null,state是否为空:true
定时器注册时间:1649213911821
KeyedProcess -> Sink: Print to Std. Out (4/8)->(裤子,1234)
状态值:(裤子,1234)
结果:4> 处理后返回数据->(裤子,1234)
状态值:(衣服,123),state是否为空:false
KeyedProcess -> Sink: Print to Std. Out (8/8)->(衣服,1)
状态值:(衣服,124)
结果:8> 处理后返回数据->(衣服,1)
定时器触发,时间为:1649213908697,状态为:(衣服,124),key为:(衣服)
定时器触发,时间为:1649213911821,状态为:(裤子,1234),key为:(裤子)
定时器触发,时间为:1649213918708,状态为:(衣服,0),key为:(衣服)
类型为:衣服,数据为0,预警
定时器触发,时间为:1649213921832,状态为:(裤子,0),key为:(裤子)
类型为:裤子,数据为0,预警
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
可以看到我们输入的内容,根据类型统计输出
定时器触发,时间为:1649213908697,状态为:(衣服,124),key为:(衣服)
定时器触发,时间为:1649213911821,状态为:(裤子,1234),key为:(裤子)
1
2
2
随后我们不进行数据输入,定时器触发进行预警操作
定时器触发,时间为:1649213918708,状态为:(衣服,0),key为:(衣服)
类型为:衣服,数据为0,预警
定时器触发,时间为:1649213921832,状态为:(裤子,0),key为:(裤子)
类型为:裤子,数据为0,预警
1
2
3
4
2
3
4
# CoProcessFunction
http://www.manongjc.com/detail/23-umgdbbcybtgvihr.html
# BroadcastProcessFunction
https://cloud.tencent.com/developer/article/1983497
上次更新: 2023/03/10, 17:00:47
