SparkStreaming Kafka 自动保存offset到zookeeper
# SparkStreaming Kafka 自动保存offset到zookeeper
# 场景
spark使用的是1.6,SparkStreaming1.6时候使用的kafka jar包为0.8的,消费时候不记录消费到的信息,导致重复消费,故手动保存到zookeeper,SparkStreaming2.1.1时使用的kafka jar包为0.10,没有出现这种状况,以下是1.6版本的消费
package com.zsh.spark.streaming
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.ZKGroupTopicDirs
import org.apache.spark.streaming.kafka.OffsetRange
import org.apache.spark.streaming.kafka.HasOffsetRanges
import kafka.utils.ZkUtils
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadataRequest
import kafka.api.PartitionOffsetRequestInfo
import kafka.api.OffsetRequest
import java.util.Properties
import java.io.FileInputStream
object Kafka2Es {
def main(args: Array[String]) {
val properties = new Properties()
// val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath
val path = Kafka2Es.getClass.getResourceAsStream("/config.properties")
properties.load(path)
val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
//此处在idea中运行时请保证local[2]核心数大于2
sprakConf.setMaster("local[4]")
val ssc = new StreamingContext(sprakConf, Seconds(3))
val brokers = properties.getProperty("kafka.brokers") //kafka地址ip:port,ip2:port
val zookeeper = properties.getProperty("zookeeper.node") //zookeeper地址 ip:port,ip2:port2
val kfkHost = brokers.split(",")(0).split(":")(0)
val kfkPort = brokers.split(",")(0).split(":")(1).toInt
val topics = properties.getProperty("kafka.consumer.topic")
val groupId = properties.getProperty("kafka.consumer.group")
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
// "bootstrap.servers" -> brokers,
// "key.deserializer" -> "org.apache.kafka.common.serialization.StringSerializer",
// "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
// "value.deserializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"group.id" -> groupId,
"serializer.class" -> "kafka.serializer.StringEncoder",
"auto.offset.reset" -> "smallest"
// "enable.auto.commit" -> (false: java.lang.Boolean) 是否自动提交offset
// "enable.auto.commit" -> "true",
// "client.id" -> "ssss",
// "auto.commit.interval.ms" -> (6*1000+"")//每隔60s自动提交一次
)
val topicDirs = new ZKGroupTopicDirs(groupId, topics) //创建一个 ZKGroupTopicDirs 对象,对保存
val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
println("Group Offset在zookeeper路径为:"+zkTopicPath)
val zkClient = new ZkClient(zookeeper)//连接Zookeeper
val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")//查询该路径下是否子节点(默认有子节点为我们自己保存不同 partition 时生成的)
println("children size is "+children)
var kafkaStream : InputDStream[(String, String)] = null
var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
if (children > 0) {
//如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
// for (i <- 0 to children-1) {
// val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
// val tp = TopicAndPartition(topics, i);
// fromOffsets += (tp -> partitionOffset.toLong) //将不同 partition 对应的 offset 增加到 fromOffsets 中
// println("@@@@@@ topic[" + topics + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
// }
val partitions = getPartitionLeader(topics, kfkHost, kfkPort) //获取每个partition的leader,然后取每个partition中的最小值,与zookeeper保存的最小值比较,如果zookeeper保存的比partition最小值小则使用partition的值
partitions.foreach(x=>{
val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${x._1}")//x._1是partitions(map)的key x._2是value
val tp = TopicAndPartition(topics, x._1)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
val consumerMin = new SimpleConsumer(x._2.toString(), 9092, 10000, 10000, "getMinOffset") //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
println("最低:"+curOffsets.head)
var nextOffset = partitionOffset.toLong
if (curOffsets.length > 0 && nextOffset < curOffsets.head) { // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的想想
println("ZshfromOffsets:"+fromOffsets)
}
)
val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
}
else {
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
}
var offsetRanges = Array[OffsetRange]()
kafkaStream.transform{ rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
rdd
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString) //将该 partition 的 offset 保存到 zookeeper
println(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######")
}
rdd.foreachPartition(
message => {
while(message.hasNext) {
val value=message.next()._2.toString
println(s"@^_^@ [" + value + "] @^_^@")
}
}
)
// println ("Zsh:"+rdd.)
rdd.map(record=>(record._1+"%%%%"+record._2)).foreach(println)
}
/**8
* 从指定位置开始读取kakfa数据
* 注意:由于Exactly Once的机制,所以任何情况下,数据只会被消费一次!
* 指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据
*/
// val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L),(topics, 3, 0L))
// val fromOffsets = setFromOffsets(offsetList)
// val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message())
// val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String,String)](ssc, kafkaParams, fromOffsets,messageHandler )
// messages.foreachRDD(
// mess => {
// //获取offset集合
// val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
// mess.foreachPartition(lines => {
// lines.map(s=>s._1+"!!!!"+s._2).foreach(println)
// // lines.foreach(line => {
// // val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
// // println("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
// // println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset} ${o.untilOffset} ")
// // println("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
// // println("The kafka line is " + line)
// // })
// })
// }
// )
// messages.print()
// val lines = messages.map(_._2).map(s=>s+":pipade")
// lines.print()
ssc.start()
ssc.awaitTermination()
}
//构建Map
def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
var fromOffsets: Map[TopicAndPartition, Long] = Map()
for (offset <- list) {
val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
fromOffsets += (tp -> offset._3) // offset位置
}
fromOffsets
}
def getPartitionLeader(topic :String,kfkHost :String,kfkPort :Int): Map[Int, String]={
// val topic_name = "test0920" //topic_name 表示我们希望获取的 topic 名字
val topic2 = List(topic)
val req = new TopicMetadataRequest(topic2, 0)
val getLeaderConsumer = new SimpleConsumer(kfkHost, kfkPort, 10000, 10000, "OffsetLookup") // 第一个参数是 kafka broker 的host,第二个是 port
val res = getLeaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] // 将结果转化为 partition -> leader 的映射关系
case None =>
Map[Int, String]()
}
partitions
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
<!-- <version>2.1.1</version> -->
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
上次更新: 2023/03/10, 16:49:38