Spark SQL之dataframe数据保存
Spark SQL之dataframe数据保存
1. 背景
- Spark SQL作为处理结构化数据的功能模块,本身支持SQL形式使用功能,内部也做了相对RDD更加高的抽象
- DataFrame也是一个抽象数据集合,但对比RDD多了schema数据结构化信息,可以将DataFrame看成是RDD+schema信息
2. dataframe数据保存类型
环境准备
- Idea2020
- jdk 1.8
- scala 2.12.12
- maven 3.6.3
- pom文件
<!-- 定义了一些常量 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.10</scala.version>
<spark.version>3.0.1</spark.version>
<hbase.version>2.2.5</hbase.version>
<hadoop.version>3.2.1</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<!-- 编译时会引入依赖,打包是不引入依赖 -->
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<!-- 编译时会引入依赖,打包是不引入依赖 -->
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
2.1 JDBC保存到mysql
def main(args: Array[String]): Unit = {
// 先读取数据,获取datafrmae
val sparkSession: SparkSession = SparkSession.builder()
.master("local")
.appName("DataFrame_SaveToMySQL")
.getOrCreate()
// 将结果通过jdbc的方式写入到mysql
val sparkContext: SparkContext = sparkSession.sparkContext
val rdd1: RDD[(String, (String, Int))] = sparkContext.makeRDD(List(("name", ("xixi", 10)), ("name", ("haha", 20)), ("name", ("licha", 30))))
// 先创建RDD,注意需要转换为Row的最小单元
val rdd2: RDD[Row] = rdd1.map(ele => {
Row(ele._2._1, ele._2._2)
})
// 这是schema信息对象
val structType: StructType = StructType(List(
StructField("name", DataTypes.StringType, false),
StructField("age", DataTypes.IntegerType, false)
))
// 结合RDD,schema信息,创建dataframe
val dataFrame: DataFrame = sparkSession.createDataFrame(rdd2, structType)
dataFrame.show()
// 往mysql中写入数据,注意如果没有表,可以使用jdbc参数,createtableifnotexists来指定,但这样以来就会没有主键
// 最好还是先创建好表格之后再写入数据,这样可以有主键,也更符合实际生产中流程和要求
dataFrame.createTempView("v_user")
val dataFrame1: DataFrame = sparkSession.sql("select name, age from v_user where age > 5")
// 注意实际生产中,一般不会直接使用root账号和密码,一般都是更低权限的用户,可以读写,但无法删除数据
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
dataFrame1
.write
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/db_demo1?createTableIfNotExists=true&characterEncoding=utf-8", "tb_user_dataframe2", properties)
sparkSession.close()
}
2.2 保存到parquet文件(还可以是json、csv等支持的文件格式)
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName("Dataframe_SaveToParquet")
.master("local")
.getOrCreate()
val structType: StructType = StructType(List(
StructField("start_ip", DataTypes.StringType, false),
StructField("end_ip", DataTypes.StringType, false),
StructField("start_ip_num", DataTypes.LongType, false),
StructField("end_ip_num", DataTypes.LongType, false),
StructField("continent", DataTypes.StringType, false),
StructField("country", DataTypes.StringType, false),
StructField("province", DataTypes.StringType, true),
StructField("city", DataTypes.StringType, true),
StructField("district", DataTypes.StringType, true),
StructField("operator", DataTypes.StringType, true),
StructField("operator_code", DataTypes.IntegerType, true),
StructField("country_en", DataTypes.StringType, true),
StructField("country_code", DataTypes.StringType, true),
StructField("longitude", DataTypes.DoubleType, true),
StructField("latitude", DataTypes.DoubleType, true)
))
// 1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931
// E:\DOITLearning\12.Spark\ip_location_dict.txt
val dataFrame: DataFrame = sparkSession
.read
.option("delimiter", "|")
.schema(structType)
.csv("E:\\DOITLearning\\12.Spark\\ip_location_dict.txt")
// 可以对数据做重新分区
dataFrame.repartition(2)
.write
.mode(SaveMode.Append)
.parquet("E:\\DOITLearning\\12.Spark\\parquet_out2")
sparkSession.close()
}
2.3 总结
- 注意,DataFrame是一个抽象数据集合,类似于数据库中的表table,因为数据变成结构化之后,更加有利于查询和分析操作
- 当使用dataframe做数据分析操作处理之后,结果需要保存。结果保存大致可以分为两类,一种是保存为文件,因为spark本身支持多种文件格式,可以直接保存为对应格式,并且支持本地文件和网络文件系统;一种是保存到各个数据库中,这个可以通过JDBC或ODBC的方式来保存,这样就不局限于MySQL一种数据库。
- 当dataFrame的数据计算出来之后,可以作为下一个操作的输入数据源,也可以是最终结果最后保存。
版权声明:本文为xiaohu21原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。