Spark-shell读取MySQL写入HDFS
# Spark-shell读取MySQL写入HDFS
# 进入spark-shell
spark-shell \
--executor-memory 8g \
--total-executor-cores 4 \
--jars /var/lib/hadoop-hdfs/jast/test/mysql-connector-java-5.1.20.jar \
--driver-class-path /var/lib/hadoop-hdfs/jast/test/mysql-connector-java-5.1.20.jar
1
2
3
4
5
2
3
4
5
# 执行代码
//连接mysql,读取表
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://10.248.111.11:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "tableName", "user" -> "root", "password" -> "rootpwd")).load()
mysqlDF.dtypes //查看数据类型 或者使用 mysqlDF.printSchema
mysqlDF.repartition(10).write.parquet("/test/table/sss") //保存至hdfs
1
2
3
4
2
3
4
当执行 mysqlDF.dtypes或执行 mysqlDF.printSchema 时可以看见数据类型,这里的数据类型是mysql读取出来自动对应生成的类型
scala> mysqlDF.printSchema
root
|-- accountId: long (nullable = false)
|-- topic: string (nullable = false)
|-- liveId: long (nullable = false)
|-- buyCount: string (nullable = true)
|-- goodsIndex: long (nullable = false)
|-- itemH5TaokeUrl: string (nullable = true)
|-- itemId: long (nullable = false)
|-- itemName: string (nullable = true)
|-- itemPic: string (nullable = true)
|-- itemPrice: double (nullable = true)
|-- itemUrl: string (nullable = true)
|-- ishot: string (nullable = false)
|-- bizdate: string (nullable = true)
scala> mysqlDF.dtypes
res23: Array[(String, String)] = Array((accountId,LongType), (topic,StringType), (liveId,LongType), (buyCount,StringType), (goodsIndex,LongType), (itemH5TaokeUrl,StringType), (itemId,LongType), (itemName,StringType), (itemPic,StringType), (itemPrice,DoubleType), (itemUrl,StringType), (ishot,StringType), (bizdate,StringType))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
如果我们想修改其中某一列数据类型,我们可以执行以下代码,拿 itemPrice 举例,我想把double 转换成string类型
import spark.implicits._ //导入这个为了隐式转换,或RDD转DataFrame之用
import org.apache.spark.sql.types.DataTypes
var sb = mysqlDF.withColumn("itemPrice",$"itemPrice".cast(DataTypes.StringType))
//也可以使用这种方式转换
val p = people.selectExpr("cast(itemPriceas string) itemPrice_bieming","xxx","xxxx")//这里要选择你需要的列
p.printSchema()//查看结构,也可以修改成功
1
2
3
4
5
6
7
2
3
4
5
6
7
查看类型,发现已经修改成功
scala> mysqlDF.dtypes
res23: Array[(String, String)] = Array((accountId,LongType), (topic,StringType), (liveId,LongType), (buyCount,StringType), (goodsIndex,LongType), (itemH5TaokeUrl,StringType), (itemId,LongType), (itemName,StringType), (itemPic,StringType), (itemPrice,DoubleType), (itemUrl,StringType), (ishot,StringType), (bizdate,StringType))
scala> sb.dtypes
res24: Array[(String, String)] = Array((accountId,LongType), (topic,StringType), (liveId,LongType), (buyCount,StringType), (goodsIndex,LongType), (itemH5TaokeUrl,StringType), (itemId,LongType), (itemName,StringType), (itemPic,StringType), (itemPrice,StringType), (itemUrl,StringType), (ishot,StringType), (bizdate,StringType))
1
2
3
4
5
2
3
4
5
将数据写入hive表,接着上面的sb变量写入
//将数据写入hdfs,注意目录使用临时目录
sb.repartition(10).write.parquet("/test/db_test/tableName_temp/date=test")
//将数据load到数据表,上面数据目录使用临时目录,load后数据会进入正式表数据目录
sql("load data inpath 'hdfs://nameservice1/test/db_test/tableName_temp/date=test' into table source_taobao_live_product_now_new")
//查看数据
sql("select * from source_taobao_live_product_now_new limit 1").show
1
2
3
4
5
6
2
3
4
5
6
上次更新: 2023/03/10, 17:30:33