Spark SQL之dataframe数据保存

Spark SQL之dataframe数据保存

1. 背景

  1. Spark SQL作为处理结构化数据的功能模块,本身支持SQL形式使用功能,内部也做了相对RDD更加高的抽象
  2. DataFrame也是一个抽象数据集合,但对比RDD多了schema数据结构化信息,可以将DataFrame看成是RDD+schema信息

2. dataframe数据保存类型

环境准备

  • Idea2020
  • jdk 1.8
  • scala 2.12.12
  • maven 3.6.3
  • pom文件
<!-- 定义了一些常量 -->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.12.10</scala.version>
        <spark.version>3.0.1</spark.version>
        <hbase.version>2.2.5</hbase.version>
        <hadoop.version>3.2.1</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <!-- 导入scala的依赖 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <!-- 编译时会引入依赖,打包是不引入依赖 -->
            <!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <!-- 编译时会引入依赖,打包是不引入依赖 -->
            <!--            <scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.1 JDBC保存到mysql

def main(args: Array[String]): Unit = {

    // 先读取数据,获取datafrmae
    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("DataFrame_SaveToMySQL")
      .getOrCreate()

    // 将结果通过jdbc的方式写入到mysql
    val sparkContext: SparkContext = sparkSession.sparkContext

    val rdd1: RDD[(String, (String, Int))] = sparkContext.makeRDD(List(("name", ("xixi", 10)), ("name", ("haha", 20)), ("name", ("licha", 30))))

    // 先创建RDD,注意需要转换为Row的最小单元
    val rdd2: RDD[Row] = rdd1.map(ele => {
      Row(ele._2._1, ele._2._2)
    })

    // 这是schema信息对象
    val structType: StructType = StructType(List(
      StructField("name", DataTypes.StringType, false),
      StructField("age", DataTypes.IntegerType, false)
    ))

    // 结合RDD,schema信息,创建dataframe
    val dataFrame: DataFrame = sparkSession.createDataFrame(rdd2, structType)

    dataFrame.show()

    // 往mysql中写入数据,注意如果没有表,可以使用jdbc参数,createtableifnotexists来指定,但这样以来就会没有主键
    // 最好还是先创建好表格之后再写入数据,这样可以有主键,也更符合实际生产中流程和要求
    dataFrame.createTempView("v_user")

    val dataFrame1: DataFrame = sparkSession.sql("select name, age from v_user where age > 5")

    // 注意实际生产中,一般不会直接使用root账号和密码,一般都是更低权限的用户,可以读写,但无法删除数据
    val properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "123456")

    dataFrame1
      .write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://localhost:3306/db_demo1?createTableIfNotExists=true&characterEncoding=utf-8", "tb_user_dataframe2", properties)


    sparkSession.close()
  }

2.2 保存到parquet文件(还可以是json、csv等支持的文件格式)

def main(args: Array[String]): Unit = {

    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Dataframe_SaveToParquet")
      .master("local")
      .getOrCreate()

    val structType: StructType = StructType(List(
      StructField("start_ip", DataTypes.StringType, false),
      StructField("end_ip", DataTypes.StringType, false),
      StructField("start_ip_num", DataTypes.LongType, false),
      StructField("end_ip_num", DataTypes.LongType, false),
      StructField("continent", DataTypes.StringType, false),

      StructField("country", DataTypes.StringType, false),
      StructField("province", DataTypes.StringType, true),
      StructField("city", DataTypes.StringType, true),
      StructField("district", DataTypes.StringType, true),
      StructField("operator", DataTypes.StringType, true),

      StructField("operator_code", DataTypes.IntegerType, true),
      StructField("country_en", DataTypes.StringType, true),
      StructField("country_code", DataTypes.StringType, true),
      StructField("longitude", DataTypes.DoubleType, true),
      StructField("latitude", DataTypes.DoubleType, true)
    ))

    // 1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931
    //  E:\DOITLearning\12.Spark\ip_location_dict.txt
    val dataFrame: DataFrame = sparkSession
      .read
      .option("delimiter", "|")
      .schema(structType)
      .csv("E:\\DOITLearning\\12.Spark\\ip_location_dict.txt")

    // 可以对数据做重新分区
    dataFrame.repartition(2)
      .write
      .mode(SaveMode.Append)
      .parquet("E:\\DOITLearning\\12.Spark\\parquet_out2")

    sparkSession.close()
  }

2.3 总结

  1. 注意,DataFrame是一个抽象数据集合,类似于数据库中的表table,因为数据变成结构化之后,更加有利于查询和分析操作
  2. 当使用dataframe做数据分析操作处理之后,结果需要保存。结果保存大致可以分为两类,一种是保存为文件,因为spark本身支持多种文件格式,可以直接保存为对应格式,并且支持本地文件和网络文件系统;一种是保存到各个数据库中,这个可以通过JDBC或ODBC的方式来保存,这样就不局限于MySQL一种数据库。
  3. 当dataFrame的数据计算出来之后,可以作为下一个操作的输入数据源,也可以是最终结果最后保存。

版权声明:本文为xiaohu21原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>