Flink消费Kafka
# Flink消费Kafka
[toc]
# 消费Kafka数据
下面代码主要实现功能:
- 消费Kafka数据只获取消费数据
- 消费Kafka数据获取Kafka数据和Kafka数据的元数据(topic,partition,offset,key等)
# 代码结构
类名 | 说明 |
---|---|
FlinkUtilsScala | 创建 Kafka Stream 工具类 |
JastKafkaDeserializationSchema | Kafka自定义反序列化类,用于返回Kafka消费详细信息,如:key,partition,offset等 |
AlertProcessWindowFunction | 消费处理类,用于消费Kafka数据,进行具体处理使用 |
AlertApplication | 启动类 |
# Kafka消费工具类
package com.rbt.util
import java.util.Properties
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerConfig
/**
* @description 创建 Kafka Stream 工具类
* @author Jast
*/
class FlinkUtilsScala {
/**
* 消费时返回结果只有数据,无其他元信息
* @name createKafkaStream
* @return scala.Function1<org.apache.flink.api.common.typeinfo.TypeInformation<T>,org.apache.flink.streaming.api.scala.DataStream<T>>
* @param env
* @param parameters
* @param clazz
* @param topic
* @author Jast
*/
@throws[Exception]
def createKafkaStream[T: TypeInformation](env: StreamExecutionEnvironment, parameters: ParameterTool,
clazz: Class[_ <: DeserializationSchema[T]]
, topic: String) = { //设置全局的参数
val (props: Properties, list: _root_.java.util.ArrayList[_root_.scala.Predef.String]) = commonCreateKafkaStream(env, parameters, topic)
//KafkaSource
val kafkaConsumer: FlinkKafkaConsumer[T] = new FlinkKafkaConsumer[T](list, clazz.newInstance(), props)
//默认开启,checkpoint成功后提交offset到kafka内部,仅供监控使用,该值存在误差
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
env.addSource(kafkaConsumer)
}
/**
* 消费时返回结果有元信息的方法,返回对象为:ConsumerRecord,该方法传入的为自定义Kafka序列化类,该序列继承KafkaDeserializationSchema
* @name createKafkaStream
* @return scala.Function1<org.apache.flink.api.common.typeinfo.TypeInformation<T>,org.apache.flink.streaming.api.scala.DataStream<T>>
* @param env
* @param parameters
* @param clazz
* @param topic
* @author Jast
*/
@throws[Exception]
def createMetaStoreKafkaStream[T: TypeInformation](env: StreamExecutionEnvironment, parameters: ParameterTool,
clazz: Class[_ <: KafkaDeserializationSchema[T]]
, topic: String) = { //设置全局的参数
val (props: Properties, list: _root_.java.util.ArrayList[_root_.scala.Predef.String]) = commonCreateKafkaStream(env, parameters, topic)
//KafkaSource
val kafkaConsumer: FlinkKafkaConsumer[T] = new FlinkKafkaConsumer[T](list, clazz.newInstance(), props)
//默认开启,checkpoint成功后提交offset到kafka内部,仅供监控使用,该值存在误差
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
env.addSource(kafkaConsumer)
}
/**
* 创建Kafka Stream通用方法
* @param env
* @param parameters
* @param topic
* @tparam T
* @return
*/
private def commonCreateKafkaStream[T: TypeInformation](env: StreamExecutionEnvironment, parameters: ParameterTool, topic: String) = {
env.getConfig.setGlobalJobParameters(parameters)
//开启Checkpointing,同时开启重启策略
env.enableCheckpointing(parameters.getLong("checkpoint.interval", 5000L), CheckpointingMode.EXACTLY_ONCE)
//取消任务,checkpoint不删除
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//设置异常重启次数与重启间隔时间 restart.attempts delay.between.attempts
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
parameters.getInt("restart.attempts", 5),
parameters.getInt("delay.between.attempts", 3000)))
val props = new Properties
//指定Kafka的Broker地址
props.setProperty("bootstrap.servers", parameters.getRequired("kafka.bootstrap.servers"))
//指定组ID
props.setProperty("group.id", parameters.getRequired("kafka.group.id"))
//如果没有记录偏移量,第一次从最开始消费
props.setProperty("auto.offset.reset", parameters.get("kafka.auto.offset.reset", "earliest"))
//kafka的消费者不自动提交偏移量
props.setProperty("enable.auto.commit", parameters.get("kafka.enable.auto.commit", "false"))
props.setProperty("max.poll.records", parameters.get("max.poll.records", "1000"))
props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "600000");
val topics: String = topic
val list = new java.util.ArrayList[String]
topics.split(",").foreach(list.add)
(props, list)
}
}
object FlinkUtilsScala {
def apply(): FlinkUtilsScala = new FlinkUtilsScala()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Kafka自定义反序列化类
package com.jast.schema
import java.nio.charset.StandardCharsets
import org.apache.flink.api.common.typeinfo.TypeHint
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
/**
* @description 自定义Kafka序列化类,用于获取Kafka元数据
* @author Jast
*/
class JastKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
override def isEndOfStream(nextElement: ConsumerRecord[String, String]) = false
@throws[Exception]
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
//将Kafka消费的详细信息返回 ,返回类型为ConsumerRecord[String,String]
new ConsumerRecord(
record.topic,
record.partition,
record.offset,
record.timestamp,
record.timestampType,
record.checksum,
record.serializedKeySize,
record.serializedValueSize,
if (record.key() == null) "" else new String(record.key, StandardCharsets.UTF_8),
if (record.value() == null) "" else new String(record.value, StandardCharsets.UTF_8))
}
/**
* 用于获取反序列化对象的类型
*
* @return
*/
override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 消费处理类
import java.util.concurrent.TimeUnit
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord
/**
* 布控预警判断
*/
@SerialVersionUID(1L)
class AlertProcessWindowFunction extends ProcessWindowFunction[(Int, ConsumerRecord[String, String]), (String, Int), Int, TimeWindow] {
override def open(parameters: Configuration): Unit = {
}
override def process(key: Int, context: Context, elements: Iterable[(Int, ConsumerRecord[String, String])], out: Collector[(String, Int)]): Unit = {
elements.foreach(element=>{
//输出Kafka消费的详细信息
println(element._2.topic())
println(element._2.partition())
println(element._2.value())
TimeUnit.SECONDS.sleep(10);
})
out.collect(("test", 0))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 配置文件
alert.properties
# 消费Kafka
kafka.topics.alert=WB_1020000001,ZW_WB_1010000005,ZW_WB_1010000004,ZW_WB_1010000003,ZW_WB_1020000002,ZW_WB_1010000009
#kafka.topics.alert=WB_1020000001
#消费者group
kafka.group.id=test2
#kafka borkers
kafka.bootstrap.servers=192.168.60.16:9092
#第一次消费从哪里消费,默认earliest
kafka.auto.offset.reset=earliest
#是否自动提交,默认false
kafka.enable.auto.commit=false
#HugeGraph批量写入时间
flink.batch.time.milliseconds=1000
#定时统计数量
flink.count.time.milliseconds=10000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 启动类
需要获取详细信息则需要使用:
val stream: DataStream[ConsumerRecord[String, String]] = FlinkUtilsScala.apply()
.createMetaStoreKafkaStream(env, tool, classOf[JastKafkaDeserializationSchema], kafkaTopics)
1
2
2
如果只需要获取Kafka数据则使用
val stream: DataStream[String] = FlinkUtilsScala.apply()
.createKafkaStream(env, tool, classOf[SimpleStringSchema], waSourceFj1001)
1
2
2
详细代码:
import java.io.File
import cn.hutool.core.io.resource.{Resource, ResourceUtil}
import cn.hutool.core.util.StrUtil
import cn.hutool.system.SystemUtil
import com.jast.function.AlertProcessWindowFunction
import com.jast.schema.JastKafkaDeserializationSchema
import com.jast.util.FlinkUtilsScala
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.kafka.clients.consumer.ConsumerRecord
object AlertApplication {
//配置文件
private val envName = "alert.properties"
def main(args: Array[String]): Unit = {
//windows、mac系统默认为本地开发调试环境
val isLocal: Boolean = SystemUtil.getOsInfo().isMac || SystemUtil.getOsInfo().isWindows
//参数工具类
var tool: ParameterTool = null
if (isLocal) {
//开发环境读取resouces目录下配置文件
val resource: Resource = ResourceUtil.getResourceObj(envName)
println("读取配置文件:" + resource.getUrl.getPath)
tool = ParameterTool.fromPropertiesFile(resource.getUrl.getPath)
} else {
//linux环境读取conf目录下的配置文件
println("读取配置文件:" + System.getProperty("user.dir") + File.separator + "conf" + File.separator + envName)
tool = ParameterTool.fromPropertiesFile(System.getProperty("user.dir") + File.separator + "conf" + File.separator + envName)
}
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
if (isLocal) {
//设置windowsStateBackEnd
env.setStateBackend(new FsStateBackend("file:////Users/mac/IdeaProjects/huairou-bigdata/state-backend"))
}
val kafkaTopics = tool.get("kafka.topics.alert")
if (!StrUtil.isBlankIfStr(kafkaTopics)) {
val stream: DataStream[ConsumerRecord[String, String]] = FlinkUtilsScala.apply()
.createMetaStoreKafkaStream(env, tool, classOf[JastKafkaDeserializationSchema], kafkaTopics)
stream.map(value => (_root_.scala.util.Random.nextInt(10), value))
.keyBy(_._1)
.timeWindow(Time.milliseconds(tool.getLong("flink.batch.time.milliseconds")))
.process(new AlertProcessWindowFunction).name("alert process")
}
env.execute
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 异常处理
# NullPointerException KafkaPartitionDiscoverer.getAllPartitionsForTopics
java.lang.NullPointerException
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:507)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
原因:当前消费的Topic在Kafka中没有。
解决方法:创建该Topic
上次更新: 2023/03/10, 16:49:38