Scala SparkStreaming 2.2.0 kafka0.10生产1.0
# Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)
[toc]
Spark 2.2 kafka0.10(api使用的0.10,实际生产kafka版本是1.0)
# 代码
package com.jast.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object SparkKafkaTest3 {
// ZK client
val client = {
val client = CuratorFrameworkFactory
.builder
.connectString("10.86.8.118:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
// .namespace("mykafka")
.build()
client.start()
client
}
// offset 路径起始位置
val Globe_kafkaOffsetPath = "/kafka/offsets"
// 路径确认函数 确认ZK中路径存在,不存在则创建该路径
def ensureZKPathExists(path: String)={
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path)
}
}
// 保存 新的 offset
def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {
for (o <- offsetRange){
val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"
ensureZKPathExists(zkPath)
// 向对应分区第一次写入或者更新Offset 信息
println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
println("保存路径:"+zkPath);
client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
println("写入成功")
}
}
def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = {
// Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition 0.8 TopicAndPartition
var fromOffset: Map[TopicPartition, Long] = Map()
val topic1 = topic(0).toString
// 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"
// 检查路径是否存在
ensureZKPathExists(zkTopicPath)
// 获取topic的子节点,即 分区
val childrens = client.getChildren().forPath(zkTopicPath)
// 遍历分区
val offSets: mutable.Buffer[(TopicPartition, Long)] = for {
p <- childrens
}
yield {
// 遍历读取子节点中的数据:即 offset
val offsetData = client.getData().forPath(s"$zkTopicPath/$p")
// 将offset转为Long
val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
// 返回 (TopicPartition, Long)
(new TopicPartition(topic1, Integer.parseInt(p)), offSet)
}
println(offSets.toMap)
if(offSets.isEmpty){
(offSets.toMap, 0)
} else {
(offSets.toMap, 1)
}
}
// if (client.checkExists().forPath(zkTopicPath) == null){
//
// (null, 0)
// }
// else {
// val data = client.getData.forPath(zkTopicPath)
// println("----------offset info")
// println(data)
// println(data(0))
// println(data(1))
// val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong)
// println(offSets)
// (offSets, 1)
// }
//
// }
def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String],
groupName:String ):InputDStream[ConsumerRecord[String, String]] = {
// get offset flag = 1 表示基于已有的offset计算 flag = 表示从头开始(最早或者最新,根据Kafka配置)
val (fromOffsets, flag) = getFromOffset(topic, groupName)
var kafkaStream:InputDStream[ConsumerRecord[String, String]] = null
if (flag == 1){
// 加上消息头
//val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
println(fromOffsets)
kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))
println(fromOffsets)
println("中断后 Streaming 成功!")
} else {
kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams))
println("首次 Streaming 成功!")
}
kafkaStream
}
def main(args: Array[String]): Unit = {
val processInterval = 5 //处理间隔时间
val brokers = "10.86.8.153:9092"
val topics = Array("all_spider_data")
val conf = new SparkConf().setMaster(args(0)).setAppName("kafka checkpoint zookeeper")
// kafka params
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "zk_group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val ssc = new StreamingContext(conf, Seconds(processInterval))
val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group")
messages.foreachRDD((rdd) => {
if (!rdd.isEmpty()){
println("###################:"+rdd.count())
}
// 存储新的offset
storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group")
})
ssc.start()
ssc.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhonghong</groupId>
<artifactId>spark-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Sparkstreaming 2.1.1版本
package com.zsh.spark.streaming
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object KafkaSparkStreaming2 {
def main(args: Array[String]) {
val conf =new SparkConf().setAppName("data").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.2.112:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "sss",
"auto.offset.reset" -> "earliest",
// "enable.auto.commit" -> (false: java.lang.Boolean) 是否自动提交offset
"enable.auto.commit" -> (true: java.lang.Boolean),
"auto.commit.interval.ms" -> (60*1000+"")//每隔60s自动提交一次
)
val topics = Array("test","weixin_for_check")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd => {
if (rdd.count() >= 1) {
rdd.map(record => (record.key,record.offset(),record.value,record.value,record.value)).foreach(println)
}
})
// stream.map(record => (record.key, record.value))
ssc.start()
ssc.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# pom文件
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<!-- <version>1.6.2</version> -->
<version>2.1.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
上次更新: 2025/07/01, 13:48:30