Spark读取Hbase写入Hive
# Spark读取Hbase写入Hive
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object WeiBoAccountFilter {
val tableName = "crawl:weibo_user"
val insertTableName = "crawl:weibo_user_v"
val hBaseConf = HBaseConfiguration.create()
val conn=ConnectionFactory.createConnection(hBaseConf)
val hbaseTable = conn.getTable(TableName.valueOf(tableName))
val insertHbaseTable = conn.getTable(TableName.valueOf(insertTableName))
case class WeiboUserSchemaClass(biFollowersCount :String,city :String,created_at :String,description :String,experience :String,followers_count :String,friends_count :String,name :String,profileImageUrl :String,province :String,statuses_count :String,uid :String,url :String,verified :String,verified_reason :String,verified_type :String,verified_type_ext :String)
def main(args: Array[String]) {
val spark = SparkSession
.builder()
// .master("local[2]")
.appName("WeiBoAccount-Verified")
.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport() //如果要读取hive的表,就必须使用这个
.getOrCreate()
// conf.set("spark.yarn.jars","hdfs://nameservice1/spark/jars/*.jar")
// conf.setMaster("local[2]")
// conf.setMaster("yarn-client")
// @transient lazy
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("INFO")
hBaseConf.set(TableInputFormat.INPUT_TABLE,tableName)
// hBaseConf.set(TableInputFormat.SCAN_TIMESTAMP,"120000")
hBaseConf.setInt("hbase.rpc.timeout", 200000)
hBaseConf.setInt("hbase.client.operation.timeout", 200000)
hBaseConf.setInt("hbase.client.scanner.timeout.period", 200000)
val scan=new Scan()
scan.withStartRow(Bytes.toBytes("0000"))
scan.withStopRow(Bytes.toBytes("0001"))
//scan.setCacheBlocks(false) //为是否缓存块,默认缓存,我们分内存,缓存和磁盘,三种方式,一般数据的读取为内存->缓存->磁盘
//scan.setBatch(100) //为设置获取记录的列个数,默认无限制,也就是返回所有的列
//scan.setCaching(1000) //每次从服务器端读取的行数,默认为配置文件中设置的值
//将scan类转化成string类型
val scan_str=convertScanToString(scan)
hBaseConf.set(TableInputFormat.SCAN,scan_str)
hBaseConf.set(TableInputFormat.SHUFFLE_MAPS,"true")
//println("hbase.mapreduce.scan.timestamp:"+ hBaseConf.get("hbase.mapreduce.scan.timestamp"))
//val rdd=sc.newAPIHadoopRDD(hBaseConf, classOf[HBaseInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val rdd=sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],
classOf[Result])
import spark.implicits._//这里的spark是上面的变量, 并非是spark带的
val value :RDD[WeiboUserSchemaClass]= rdd.map(convertHive).filter(_!=null)
value.foreach(println)
val tempDS = value.repartition(10) .toDF()//转换为DataFrame,注册为表视图,转换为Dataset也可以
//repartition数量会决定最终存入hive的文件数量与执行程序的并发数量,适当增加或减少该值,有助于性能提升
tempDS.createTempView("test_table")//注册表视图,供sql查询
spark.sql("desc test_table").show(false)
val frame = spark.sql("select `name`,uid,url from test_table")
// frame.write.parquet("xxx") //保存至HDFS
spark.sql("select count(1) from test_table").show(false)
// spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS test_table_0724(biFollowersCount String ,city String ,created_at String ,description String ,experience String ,followers_count String ,friends_count String ,name String ,profileImageUrl String ,province String ,statuses_count String ,uid String ,url String ,verified String ,verified_reason String ,verified_type String ,verified_type_ext String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY')")
//写入Hive表
spark.sql("INSERT INTO dw_crawler.test_table_0724 SELECT biFollowersCount,city,created_at,description,experience,followers_count,friends_count,name,profileImageUrl,province,statuses_count,uid,url,verified,verified_reason,verified_type,verified_type_ext FROM test_table")
spark.sql("select count(1) from dw_crawler.test_table_0724").show(false)
// rdd.count
val num = rdd.count()
println("数量:"+ num)
// rdd.foreach(addRow)//插入Hbase数据库
println("关闭所有连接")
insertHbaseTable.close()
hbaseTable.close()
conn.close()
}
var i:Int = 0
def delete(rowkey:String): Unit ={
val delete = new Delete (rowkey.getBytes)
i=i+1
hbaseTable.delete (delete)
println("删除成功"+rowkey)
}
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
return Base64.encodeBytes(proto.toByteArray)
}
//数据解析,插入hbase表
def addRow(tuple : (ImmutableBytesWritable,Result)): Unit ={
val row = tuple._2.getRow
val put = new Put(row)
val verified_type = Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"), Bytes.toBytes("verified_type")))
if(!verified_type.equals("-1") &&
!verified_type.equals("220") &&
isIntByRegex(verified_type)) {
addColumn(put, tuple._2, "fn", "biFollowersCount")
addColumn(put, tuple._2, "fn", "city")
addColumn(put, tuple._2, "fn", "created_at")
addColumn(put, tuple._2, "fn", "description")
addColumn(put, tuple._2, "fn", "experience")
addColumn(put, tuple._2, "fn", "followers_count")
addColumn(put, tuple._2, "fn", "friends_count")
addColumn(put, tuple._2, "fn", "name")
addColumn(put, tuple._2, "fn", "profileImageUrl")
addColumn(put, tuple._2, "fn", "province")
addColumn(put, tuple._2, "fn", "statuses_count")
addColumn(put, tuple._2, "fn", "uid")
addColumn(put, tuple._2, "fn", "url")
addColumn(put, tuple._2, "fn", "verified")
addColumn(put, tuple._2, "fn", "verified_reason")
addColumn(put, tuple._2, "fn", "verified_type")
addColumn(put, tuple._2, "fn", "verified_type_ext")
insertHbaseTable.put(put)
}
}
//将hbase查询出的数据转换为Schema
def convertHive(tuple : (ImmutableBytesWritable,Result)): WeiboUserSchemaClass ={
val row = tuple._2.getRow
val put = new Put(row)
val verified_type = Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"), Bytes.toBytes("verified_type")))
if(!verified_type.equals("-1") &&
!verified_type.equals("220") &&
isIntByRegex(verified_type)) {
val biFollowersCount=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("biFollowersCount")))
val city=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("city")))
val created_at=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("created_at")))
val description=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("description")))
val experience=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("experience")))
val followers_count=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("followers_count")))
val friends_count=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("friends_count")))
val name=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("name")))
val profileImageUrl=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("profileImageUrl")))
val province=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("province")))
val statuses_count=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("statuses_count")))
val uid=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("uid")))
val url=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("url")))
val verified=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified")))
val verified_reason=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified_reason")))
val verified_type=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified_type")))
val verified_type_ext=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified_type_ext")))
return new WeiboUserSchemaClass(biFollowersCount,city,created_at,description,experience,followers_count,friends_count,name,profileImageUrl,province,statuses_count,uid
,url,verified,verified_reason,verified_type,verified_type_ext)
}
null
}
def addColumn(put :Put,tuple:Result,fn :String ,column :String): Unit ={
put.addColumn(Bytes.toBytes(fn), Bytes.toBytes(column),tuple.getValue(Bytes.toBytes(fn), Bytes.toBytes(column)))
}
def isIntByRegex(s : String) = {
val pattern = """^(\d+)$""".r
s match {
case pattern(_*) => true
case _ => false
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
上次更新: 2023/03/10, 16:49:38