Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Spark算子
  • Spark启动参数以及调优记录
  • Spark-shell读取MySQL写入HDFS
  • Spark foreachRDD的正确使用
  • DataFrame函数
  • Spark WebUI更换使用端口
  • Spark stage如何划分
  • Spark使用HanLP分词
  • Spark RDD分区2G限制
  • Spark读取Hbase写入Hive
  • Ambari Spark 提交任务报错
  • JavaAPI提交Spark任务
  • SparkStreaming Kafka 自动保存offset到zookeeper
  • Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)
  • SparkStreaming参数介绍
  • SparkKerberos租约到期
  • Spark日志Log4j发送到Kafka
  • Spark --files介绍
  • SparkGraphX使用详解
  • Spark运行异常记录
  • 《Spark教程》笔记
Jast-zsh
2025-07-01
目录

Scala SparkStreaming 2.2.0 kafka0.10生产1.0

# Scala-SparkStreaming 2.2.0 kafka0.10(生产1.0)

[toc]

Spark 2.2 kafka0.10(api使用的0.10,实际生产kafka版本是1.0)

# 代码

package com.jast.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


import scala.collection.JavaConversions._
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object SparkKafkaTest3 {


  // ZK client
  val client = {
    val client = CuratorFrameworkFactory
      .builder
      .connectString("10.86.8.118:2181")
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
    // .namespace("mykafka")
      .build()
    client.start()
    client
  }

  // offset 路径起始位置
  val Globe_kafkaOffsetPath = "/kafka/offsets"

  // 路径确认函数  确认ZK中路径存在,不存在则创建该路径
  def ensureZKPathExists(path: String)={

    if (client.checkExists().forPath(path) == null) {
      client.create().creatingParentsIfNeeded().forPath(path)
    }

  }


  // 保存 新的 offset
  def storeOffsets(offsetRange: Array[OffsetRange], groupName:String) = {

    for (o <- offsetRange){
      val zkPath = s"${Globe_kafkaOffsetPath}/${groupName}/${o.topic}/${o.partition}"
      ensureZKPathExists(zkPath)
      // 向对应分区第一次写入或者更新Offset 信息
      println("---Offset写入ZK------\nTopic:" + o.topic +", Partition:" + o.partition + ", Offset:" + o.untilOffset)
      println("保存路径:"+zkPath);
      client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
      println("写入成功")
    }
  }

  def getFromOffset(topic: Array[String], groupName:String):(Map[TopicPartition, Long], Int) = {

    // Kafka 0.8和0.10的版本差别,0.10 为 TopicPartition   0.8 TopicAndPartition
    var fromOffset: Map[TopicPartition, Long] = Map()

    val topic1 = topic(0).toString

    // 读取ZK中保存的Offset,作为Dstrem的起始位置。如果没有则创建该路径,并从 0 开始Dstream
    val zkTopicPath = s"${Globe_kafkaOffsetPath}/${groupName}/${topic1}"

    // 检查路径是否存在
    ensureZKPathExists(zkTopicPath)

    // 获取topic的子节点,即 分区
    val childrens = client.getChildren().forPath(zkTopicPath)

    // 遍历分区
    val offSets: mutable.Buffer[(TopicPartition, Long)] = for {
      p <- childrens
    }
      yield {

        // 遍历读取子节点中的数据:即 offset
        val offsetData = client.getData().forPath(s"$zkTopicPath/$p")
        // 将offset转为Long
        val offSet = java.lang.Long.valueOf(new String(offsetData)).toLong
        // 返回  (TopicPartition, Long)
        (new TopicPartition(topic1, Integer.parseInt(p)), offSet)
      }
    println(offSets.toMap)

    if(offSets.isEmpty){
      (offSets.toMap, 0)
    } else {
      (offSets.toMap, 1)
    }


  }

  //    if (client.checkExists().forPath(zkTopicPath) == null){
  //
  //      (null, 0)
  //    }
  //    else {
  //      val data = client.getData.forPath(zkTopicPath)
  //      println("----------offset info")
  //      println(data)
  //      println(data(0))
  //      println(data(1))
  //      val offSets = Map(new TopicPartition(topic1, 0) -> 7332.toLong)
  //      println(offSets)
  //      (offSets, 1)
  //    }
  //
  //  }

  def createMyZookeeperDirectKafkaStream(ssc:StreamingContext, kafkaParams:Map[String, Object], topic:Array[String],
                                         groupName:String ):InputDStream[ConsumerRecord[String, String]] = {

    // get offset  flag = 1  表示基于已有的offset计算  flag = 表示从头开始(最早或者最新,根据Kafka配置)
    val (fromOffsets, flag) = getFromOffset(topic, groupName)
    var kafkaStream:InputDStream[ConsumerRecord[String, String]] = null
    if (flag == 1){
      // 加上消息头
      //val messageHandler = (mmd:MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
      println(fromOffsets)
      kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffsets))
      println(fromOffsets)
      println("中断后 Streaming 成功!")

    } else {
      kafkaStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams))

      println("首次 Streaming 成功!")

    }
    kafkaStream
  }

  def main(args: Array[String]): Unit = {

    val processInterval = 5 //处理间隔时间
    val brokers = "10.86.8.153:9092"
    val topics = Array("all_spider_data")
    val conf = new SparkConf().setMaster(args(0)).setAppName("kafka checkpoint zookeeper")
    // kafka params
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "zk_group",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val ssc = new StreamingContext(conf, Seconds(processInterval))

    val messages = createMyZookeeperDirectKafkaStream(ssc, kafkaParams, topics, "zk_group")

    messages.foreachRDD((rdd) => {
      if (!rdd.isEmpty()){

        println("###################:"+rdd.count())
      }

      // 存储新的offset
      storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "zk_group")
    })

    ssc.start()
    ssc.awaitTermination()

  }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179

# Pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhonghong</groupId>
    <artifactId>spark-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.2.0</spark.version>
        <scala.version>2.11.8</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--  <dependency>
              <groupId>org.apache.kafka</groupId>
              <artifactId>kafka-clients</artifactId>
              <version>0.8.2.0</version>
          </dependency>-->

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>

        <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.14</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


       <!-- <dependency>
                   <groupId>org.apache.kafka</groupId>
                   <artifactId>kafka-clients</artifactId>
                   <version>1.0.0</version>
               </dependency>-->

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

# Sparkstreaming 2.1.1版本

package com.zsh.spark.streaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object KafkaSparkStreaming2 {
	def main(args: Array[String]) {

		val conf =new SparkConf().setAppName("data").setMaster("local[2]")
				val ssc = new StreamingContext(conf, Seconds(2))
				val kafkaParams = Map[String, Object](
						"bootstrap.servers" -> "192.168.2.112:9092",
						"key.deserializer" -> classOf[StringDeserializer],
						"value.deserializer" -> classOf[StringDeserializer],
						"group.id" -> "sss",
						"auto.offset.reset" -> "earliest",
					//	"enable.auto.commit" -> (false: java.lang.Boolean)  是否自动提交offset
						"enable.auto.commit" -> (true: java.lang.Boolean),
						"auto.commit.interval.ms" -> (60*1000+"")//每隔60s自动提交一次
						)

				val topics = Array("test","weixin_for_check")
				val stream = KafkaUtils.createDirectStream[String, String](
						ssc,
						PreferConsistent,
						Subscribe[String, String](topics, kafkaParams)
						)
				stream.foreachRDD(rdd => {
					if (rdd.count() >= 1) {
						rdd.map(record => (record.key,record.offset(),record.value,record.value,record.value)).foreach(println)
					}
				})
				//				stream.map(record => (record.key, record.value))
				ssc.start()
				ssc.awaitTermination()
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# pom文件

<dependency>  
    <groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming_2.10</artifactId>
	<version>2.1.1</version>
	<scope>provided</scope>
</dependency>
<dependency>
	<groupId>org.scala-lang</groupId>
	<artifactId>scala-library</artifactId>
	<version>${scala.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<!-- <version>1.6.2</version> -->
	<version>2.1.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
上次更新: 2025/07/01, 13:48:30
最近更新
01
TODO-Clickhouse-Explain查看执行计划
07-02
02
Ambari自定义服务开发-metainfo详细介绍
07-01
03
IoTDB服务安装教程-单机版
07-01
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式