一、SparkSQL

前面的文章中使用 RDD 进行数据的处理,优点是非常的灵活,但需要了解各个算子的场景,需要有一定的学习成本,而 SQL 语言是一个大家十分熟悉的语言,如果可以通过编写 SQL 而操作RDD,学习的成本便会大大降低,在大数据领域 SQL 已经是数一个非常重要的范式,在 Hadoop 生态圈中,我们可以通过 Hive 进而转换成 MapReduces 进行数据分析,在后起之秀的 Flink 中也有 FlinkSQL 来简化数据的操作。

SparkSQL 可以理解成是将 SQL 解析成:RDD + 优化 再执行。

SparkSQL 对比 Hive

SparkSQLHive计算方式基于 RDD 在内存计算转化为 MapReduces 需要磁盘IO读写计算引擎SparkMR、Spark、Tez性能快慢元数据无自身的元数据,可以与Hive metastore连接Hive metastore缓存表支持不支持视图支持支持ACID不支持支持(hive 0.14)分区支持支持分桶支持支持

SparkSQL 的适用场景

数据类型说明结构化数据有固定的 Schema ,例如:关系型数据库的表半结构化数据没有固定的 Schema,但是有结构,数据一般是自描述的,例如:JSON 数据

理解 DataFrame 和 DataSet

SparkSQL的数据抽象是 DataFrame 和 DataSet ,底层都是RDD。

DataFrame 可以理解为是一个分布式表,包括:RDD - 泛型 + Schema约束(指定了字段名和类型) + SQL操作 + 优化 。

DataSet 在 DataFrame 的基础上增加了泛型的概念。

例如:有文本数据,读取为 RDD 后,可以拥有如下数据:

1小明110@qq.com2小张120@qq.com3小王130@qq.com

如果转化为 DataFrame ,那就就可以拥有下面数据:

ID:bigint姓名:String邮箱:String1小明110@qq.com2小张120@qq.com3小王130@qq.com

如果转化为 DataSet ,那就就可以拥有下面数据:

ID:bigint姓名:String邮箱:String泛型1小明110@qq.comuser2小张120@qq.comuser3小王130@qq.comuser

DataSet跟DataFrame还是有挺大区别的,DataFrame开发都是写SQL,但是DataSet可以使用类似RDD的API。也可以理解成DataSet就是存了个数据类型的RDD 。

二、通过 RDD 使用SparkSQL

如果是使用 Scala 或 Java 语言开发,需要引入 SparkSQL 的依赖:

org.apache.spark

spark-sql_2.12

3.0.1

假如现在有如下文本文件,分别对应含义为:ID、名称、年龄、邮箱

1 小明 20 110.@qq.com

2 小红 29 120.@qq.com

3 李四 25 130.@qq.com

4 张三 30 140.@qq.com

5 王五 35 150.@qq.com

6 赵六 40 160.@qq.com

下面还是使用前面文章的方式读取文本为 RDD ,不过不同的是,我们将 RDD 转为 DataFrame 使用 SQL 的方式处理:

Scala:

object SQLRddScala {

case class User(id: Int, name: String, age: Int, email: String)

def main(args: Array[String]): Unit = {

//声明 SparkSession

val spark: SparkSession = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate()

//通过 SparkSession 获取 SparkContext

val sc = spark.sparkContext

//读取文件为 RDD

val text = sc.textFile("D://test/input1/")

//根据空格拆分字段

val rdd = text.map(_.split(" ")).map(s => User(s(0).toInt, s(1), s(2).toInt, s(3)))

//转化为 DataFrame,并指定 Schema

val dataFrame = spark.createDataFrame(rdd)

//打印 Schema

dataFrame.printSchema()

//查看数据

dataFrame.show()

//DSL 风格查询

dataFrame.select("id","name").filter("age >= 30").show()

//SQL 风格

//注册表

dataFrame.createOrReplaceTempView("user")

//执行 SQL 语言

spark.sql("select * from user where age >= 30").show()

//关闭资源

spark.stop()

}

}

Java:

public class SQLRddJava {

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class User {

private Integer id;

private String name;

private Integer age;

private String email;

}

public static void main(String[] args) {

// 声明 SparkSession

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate();

// 通过 SparkSession 获取 SparkContext

JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

// 读取文件为 RDD

JavaRDD text = sc.textFile("D://test/input1/");

//根据空格拆分字段

JavaRDD rdd = text.map(s -> s.split(" ")).map(s -> new User(Integer.parseInt(s[0]), s[1], Integer.parseInt(s[2]), s[3]));

//转化为 DataFrame,并指定 Schema

Dataset dataFrame = spark.createDataFrame(rdd, User.class);

//打印 Schema

dataFrame.printSchema();

// 查看数据

dataFrame.show();

//DSL 风格查询

dataFrame.select("id","name").filter("age >= 30").show();

// SQL 风格

dataFrame.createOrReplaceTempView("user");

// 注册表

spark.sql("select * from user where age >= 30").show();

// 执行 SQL 语言

spark.stop();

}

}

Python:

from pyspark.sql import SparkSession

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

# 通过 SparkSession 获取 SparkContext

sc = spark.sparkContext

# 读取文件为 RDD

text = sc.textFile("D:/test/input1/")

# 根据空格拆分字段

rdd = text.map(lambda s: s.split(" "))

# 转化为 DataFrame,并指定 Schema

dataFrame = spark.createDataFrame(rdd, ["id", "name", "age", "email"])

# 打印 Schema

dataFrame.printSchema()

# 查看数据

dataFrame.show()

# DSL 风格查询

dataFrame.select(["id","name"]).filter("age >= 30").show()

# SQL 风格

# 注册表

dataFrame.createOrReplaceTempView("user")

# 执行 SQL 语言

spark.sql("select * from user where age >= 30").show()

#关闭资源

spark.stop()

打印的 Schema 信息: 全部数据内容:

DSL 查询结果:

SQL 查询结果:

三、多数据源交互

在 SparkSession 中可以通过: spark.read.格式(路径) 的方式, 获取 SparkSQL 中的外部数据源访问框架 DataFrameReader,DataFrameReader 有两种访问方式,一种是使用 load 方法加载,使用 format 指定加载格式, 还有一种是使用封装方法, 类似 csv, json, jdbc 等,这两种方式本质上一样,都是 load 的封装。

注意:如果使用 load 方法加载数据, 但是没有指定 format 的话, 默认是按照 Parquet 文件格式读取。

对于写数据SparkSQL 中增加了一个新的数据写入框架 DataFrameWriter ,同样也有两种使用方式,一种是使用 format 配合 save,还有一种是使用封装方法,例如 csv, json, saveAsTable 等,参数如下:

组件说明source写入目标, 文件格式等, 通过 format 方法设定mode写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter 向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode 方法设定extraOptions外部参数, 例如 JDBC 的 URL, 通过 options, option 设定partitioningColumns类似 Hive 的分区, 保存表的时候使用, 这个地方的分区不是 RDD 的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy 设定bucketColumnNames类似 Hive 的分桶, 保存表的时候使用, 通过 bucketBy 设定sortColumnNames用于排序的列, 通过 sortBy 设定

其中一个很重要的参数叫做 mode,表示指定的写入模式,可以传入Scala 对象表示或字符串表示,有如下几种方式:

Scala 对象表示字符串表示说明SaveMode.ErrorIfExists“error”将 DataFrame 保存到 source 时, 如果目标已经存在, 则报错SaveMode.Append“append”将 DataFrame 保存到 source 时, 如果目标已经存在, 则添加到文件或者 Table 中SaveMode.Overwrite“overwrite”将 DataFrame 保存到 source 时, 如果目标已经存在, 则使用 DataFrame 中的数据完全覆盖目标SaveMode.Ignore“ignore”将 DataFrame 保存到 source 时, 如果目标已经存在, 则不会保存 DataFrame 数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

注意:如果没有指定 format, 默认的 format 是 Parquet

1. 读写 CSV 格式

准备 CSV 文件:

Scala:

object SQLCSV {

def main(args: Array[String]): Unit = {

//声明 SparkSession

val spark: SparkSession = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate()

//读取 CSV

val csv = spark

.read

.schema("id int, name string, age int, email string")

.option("header", "true") //第一行为标题

.csv("D:/test/input1/test.csv")

csv.printSchema()

csv.show()

// SQL 操作

csv.createOrReplaceTempView("csv")

spark.sql("select * from csv where age >= 30").show()

//如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符

val csv1 = spark

.read

.schema("id int, name string, age int, email string")

.option("delimiter", " ")

.csv("D:/test/input1/test.txt")

csv1.printSchema()

csv1.show()

//写出CSV文件

csv1.write.mode(SaveMode.Overwrite).json("D:/test/output")

//写出查询结果

spark.sql("select * from csv where age <= 30")

.write.mode(SaveMode.Overwrite).csv("D:/test/output1")

spark.stop()

}

}

Java:

public class SQLCSVJava {

public static void main(String[] args) {

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate();

//读取 CSV

Dataset csv = spark.read()

.schema("id int, name string, age int, email string")

.option("header", "true") //第一行为标题

.csv("D:/test/input1/test.csv");

csv.printSchema();

csv.show();

// SQL 操作

csv.createOrReplaceTempView("csv");

spark.sql("select * from csv where age >= 30").show();

//如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符

Dataset csv1 = spark.read()

.schema("id int, name string, age int, email string")

.option("delimiter", " ") //第一行为标题

.csv("D:/test/input1/test.txt");

csv1.printSchema();

csv1.show();

//写出CSV文件

csv1.write().mode(SaveMode.Overwrite).json("D:/test/output");

//写出查询结果

spark.sql("select * from csv where age <= 30")

.write().mode(SaveMode.Overwrite).csv("D:/test/output1");

spark.close();

}

}

Python:

from pyspark.sql import SparkSession,DataFrameWriter

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

# 读取 CSV

csv = spark.read \

.schema("id int, name string, age int, email string") \

.option("header", "true") \

.csv("D:/test/input1/test.csv")

csv.printSchema()

csv.show()

# SQL 操作

csv.createOrReplaceTempView("csv")

spark.sql("select * from csv where age >= 30").show()

# 如果是txt文本,也可以根据 CSV 格式读取,不过需要指定分隔符

csv1 = spark.read \

.schema("id int, name string, age int, email string") \

.option("delimiter", " ") \

.csv("D:/test/input1/test.txt")

csv1.printSchema()

csv1.show()

# 写出CSV文件

csv1.write.mode("overwrite").json("D:/test/output")

# 写出查询结果

spark.sql("select * from csv where age <= 30").write.mode("overwrite").csv("D:/test/output1")

#关闭资源

spark.stop()

存储的 csv :

2. 读写Parquet 格式文件

先将上面 csv 文件转为 Parquet 文件:

//读取 CSV

Dataset csv = spark.read()

.schema("id int, name string, age int, email string")

.option("header", "true") //第一行为标题

.csv("D:/test/input1/test.csv");

// 转化为 Parquet 文件

csv.write().mode(SaveMode.Overwrite).parquet("D:/test/output3");

将该文件名修改为 test.parquet 方便下面测试:

读取 Parquet 格式文件:

Scala:

object SQLParquet {

def main(args: Array[String]): Unit = {

//声明 SparkSession

val spark: SparkSession = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate()

//读取 parquet

val parquet = spark.read.parquet("D:/test/output3/test.parquet")

parquet.printSchema()

parquet.show()

// SQL 操作

parquet.createOrReplaceTempView("parquet")

spark.sql("select * from parquet where age >= 30").show()

//写入 Parquet 的时候指定分区

parquet.write.mode(SaveMode.Overwrite).partitionBy("age").csv("D:/test/output5")

spark.stop()

}

}

Java

public class SQLParquetJava {

public static void main(String[] args) {

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate();

//读取 parquet

Dataset parquet = spark.read().parquet("D:/test/output3/test.parquet");

parquet.printSchema();

parquet.show();

// SQL 操作

parquet.createOrReplaceTempView("parquet");

spark.sql("select * from parquet where age >= 30").show();

//写入 Parquet 的时候指定分区

parquet.write().mode(SaveMode.Overwrite).partitionBy("age").parquet("D:/test/output5");

spark.close();

}

}

Python:

from pyspark.sql import SparkSession,DataFrameWriter

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

# 读取 parquet

parquet = spark.read.parquet("D:/test/output3/test.parquet")

parquet.printSchema()

parquet.show()

# SQL操作

parquet.createOrReplaceTempView("parquet")

spark.sql("select * from parquet where age >= 30").show()

# 写入Parquet的时候指定分区

parquet.write.mode("overwrite").partitionBy("age").csv("D:/test/output5")

#关闭资源

spark.stop()

SQL 查询结果:

输出目录:

3. 读写 JSON 格式文件

将上面CSV数据转化为 JSON:

//读取 CSV

Dataset csv = spark.read()

.schema("id int, name string, age int, email string")

.option("header", "true") //第一行为标题

.csv("D:/test/input1/test.csv");

// 转化为 JSON 文件

csv.write().mode(SaveMode.Overwrite).json("D:/test/output6");

读写 JSON 格式文件:

Scala:

object SQLJson {

def main(args: Array[String]): Unit = {

//声明 SparkSession

val spark: SparkSession = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate()

//读取 JSON

val json = spark.read.json("D:/test/output6/test.json")

json.printSchema()

json.show()

// SQL 操作

json.createOrReplaceTempView("parquet")

spark.sql("select * from parquet where age >= 30").show()

//写入 JSON

json.filter("age < 30 ").write.json("D:/test/output7")

spark.stop()

}

}

Java:

public class SQLJsonJava {

public static void main(String[] args) {

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate();

//读取 JSON

Dataset json = spark.read().json("D:/test/output6/test.json");

json.printSchema();

json.show();

// SQL 操作

json.createOrReplaceTempView("parquet");

spark.sql("select * from parquet where age >= 30").show();

//写入 JSON

json.filter("age < 30 ").write().json("D:/test/output7");

spark.close();

}

}

Python:

from pyspark.sql import SparkSession,DataFrameWriter

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

# 读取JSON

json = spark.read.json("D:/test/output6/test.json")

json.printSchema()

json.show()

# SQL操作

json.createOrReplaceTempView("parquet")

spark.sql("select * from parquet where age >= 30").show()

# 写入JSON

json.filter("age < 30 ").write.json("D:/test/output7")

#关闭资源

spark.stop()

4. 读写 MySQL 格式文件

Scala 和 Scala 项目需要引入 MySQL 的依赖:

mysql

mysql-connector-java

8.0.22

创建表:

CREATE TABLE `user` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`name` varchar(255) DEFAULT NULL,

`age` int(11) DEFAULT NULL,

`email` varchar(255) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;

写入测试数据:

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (1, '小明', 20, '110.@qq.com');

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (2, '小红', 29, '120.@qq.com');

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (3, '李四', 25, '130.@qq.com');

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (4, '张三', 30, '140.@qq.com');

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (5, '王五', 35, '150.@qq.com');

INSERT INTO `testdb`.`user`(`id`, `name`, `age`, `email`) VALUES (6, '赵六', 40, '160.@qq.com');

读写 MySQL 格式文件:

Scala:

object SQLMySql {

def main(args: Array[String]): Unit = {

//声明 SparkSession

val spark: SparkSession = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate()

//读取 mysql

val prop = new Properties

prop.setProperty("user", "root")

prop.setProperty("password", "root")

val user = spark.read.jdbc(

"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",

"user",

prop)

user.printSchema()

user.show()

// SQL 操作

user.createOrReplaceTempView("user")

spark.sql("select * from user where age >= 30").show()

//写入表信息,没有表自动创建

user.filter("age < 30 ")

.write.mode(SaveMode.Overwrite).jdbc(

"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",

"user2",

prop)

spark.stop()

}

}

Java:

public class SQLMySqlJava {

public static void main(String[] args) {

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate();

//读取 mysql

Properties prop = new Properties();

prop.setProperty("user","root");

prop.setProperty("password","root");

Dataset user = spark.read().jdbc(

"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",

"user",

prop);

user.printSchema();

user.show();

// SQL 操作

user.createOrReplaceTempView("user");

spark.sql("select * from user where age >= 30").show();

//写入表信息,没有表自动创建

user.filter("age < 30 ").write().mode(SaveMode.Overwrite).jdbc(

"jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",

"user2",

prop

);

spark.close();

}

}

Python 使用 Python 读写 MySql 需要将 MySql的驱动放到 java 安装目录的 jre\lib\ext目录下:

from pyspark.sql import SparkSession, SQLContext

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

properties = {'user': 'root', 'password': 'root'}

url = "jdbc:mysql://127.0.0.1:3306/testdb?characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"

user = spark.read.jdbc(url=url, table="user", properties=properties)

user.printSchema()

user.show()

# SQL操作

user.createOrReplaceTempView("user")

spark.sql("select * from user where age >= 30").show()

# 写入表信息,没有表自动创建

user.filter("age < 30 ").write.mode("overwrite").jdbc(url=url, table="user2", properties=properties)

# 关闭资源

spark.stop()

SQL 查询结果: MySQL 表信息:

四、SparkOnHive

在 Hive 中可以将运算引擎改为 Spark ,也就是 HiveONSpark 不过这种方式严重依赖 Hive ,已经淘汰,而 SparkOnHvie 是在 SparkSQL 诞生之后提出的,仅仅使用 Hive 的元数据(库、表、字段、位置等),剩下的全部由 Spark 进行语法解析、物理执行计划、SQL优化等。

由于远程模式下 Hive 的元数据是由 metastore 服务控制,因此确保metastore 服务正常启动,如果对此不了解,可以参考下面文章:

https://xiaobichao.blog.csdn.net/article/details/127717080

注意:spark3.0.1整合hive要求hive版本>=2.3.7

Scala 和 Java 项目需要引入 spark-hive 的依赖:

org.apache.spark

spark-hive_2.12

3.0.1

Scala:

object SparkOnHive {

def main(args: Array[String]): Unit = {

System.setProperty("HADOOP_USER_NAME", "root")

val spark = SparkSession.builder

.appName("sparksql")

.master("local[*]")

// 实际开发中可以根据集群规模调整大小,默认200

.config("spark.sql.shuffle.partitions", "8")

// 指定 Hive 数据库在 HDFS 上的位置

.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")

// hive metastore 的地址

.config("hive.metastore.uris", "thrift://node1:9083")

// 开启对hive语法的支持

.enableHiveSupport

.getOrCreate

// 查询全部数据库

spark.sql("show databases").show()

// 使用 bxc 库

spark.sql("use bxc").show()

// 查询全部表

spark.sql("show tables").show()

// 创建表

spark.sql("create table if not exists `user2`(" +

" id int comment 'ID'," +

" name string comment '名称'," +

" age int comment '年龄'," +

" email string comment '邮箱'" +

") comment '用户表'" +

"row format delimited " +

"fields terminated by ',' " +

"lines terminated by '\n' ").show()

spark.sql("show tables").show()

//查询数据

spark.sql("select * from `user`").show()

spark.stop()

}

}

Java

public class SparkOnHiveJava {

public static void main(String[] args) {

System.setProperty("HADOOP_USER_NAME","root");

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

// 实际开发中可以根据集群规模调整大小,默认200

.config("spark.sql.shuffle.partitions", "8")

// 指定 Hive 数据库在 HDFS 上的位置

.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")

// hive metastore 的地址

.config("hive.metastore.uris", "thrift://node1:9083")

// 开启对hive语法的支持

.enableHiveSupport()

.getOrCreate();

// 查询全部数据库

spark.sql("show databases").show();

// 使用 bxc 库

spark.sql("use bxc").show();

// 查询全部表

spark.sql("show tables").show();

// 创建表

spark.sql(

"create table if not exists `user2`(" +

" id int comment 'ID'," +

" name string comment '名称'," +

" age int comment '年龄'," +

" email string comment '邮箱'" +

") comment '用户表'" +

"row format delimited " +

"fields terminated by ',' " +

"lines terminated by '\n' ").show();

// 查询全部表

spark.sql("show tables").show();

//查询数据

spark.sql("select * from `user`").show();

spark.stop();

}

}

Python:

from pyspark.sql import SparkSession, SQLContext

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder\

.appName('sparksql')\

.master("local[*]") \

.config("spark.sql.shuffle.partitions", "8") \

.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse") \

.config("hive.metastore.uris", "thrift://node1:9083") \

.enableHiveSupport() \

.getOrCreate()

# 查询全部数据库

spark.sql("show databases").show()

# 使用bxc库

spark.sql("use bxc").show()

# 查询全部表

spark.sql("show tables").show()

# 创建表

spark.sql("create table if not exists `user2`(" +

" id int comment 'ID'," +

" name string comment '名称'," +

" age int comment '年龄'," +

" email string comment '邮箱'" +

") comment '用户表'" +

"row format delimited " +

"fields terminated by ',' " +

"lines terminated by '\n' ").show()

spark.sql("show tables").show()

# 查询数据

spark.sql("select * from `user`").show()

spark.stop()

查看所有库: 查看全部表: 查询表信息:

五、SparkOnES

创建测试索引,向 ES 发送 PUT 请求:

PUT /user

{

"settings": {},

"mappings": {

"properties": {

"name": {

"type": "text",

"index": true,

"analyzer": "pinyin"

},

"sex": {

"type": "text",

"index": true,

"analyzer": "ik_smart"

},

"age": {

"type": "long",

"index": false

}

}

}

}

写入几条测试数据:

POST /user/_doc

{"name":"张三","age":18,"sex":"男"}

{"name":"李四","age":20,"sex":"男"}

{"name":"王五","age":30,"sex":"女"}

{"name":"赵六","age":40,"sex":"男"}

{"name":"小王","age":60,"sex":"女"}

如果是 Scala 和 Java 项目需要引入 ES_Spark 的依赖包:

org.elasticsearch

elasticsearch-spark-30_2.12

7.17.5

Scala:

object SparkOnES {

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession

.builder()

.appName("SparkOnES")

.master("local[*]")

.getOrCreate()

val options = Map(

"es.nodes" -> "192.168.40.176",

"es.port" -> "9200",

"pushdown" -> "true",

"es.nodes.wan.only" -> "true",

"es.update.retry.on.conflict" -> "3",

"es.mapping.date.rich" -> "false",

"es.index.auto.create" -> "true",

"es.input.max.docs.per.partition" -> "5000000")

val esDataSet = spark.read

.format("org.elasticsearch.spark.sql")

.options(options)

.load("user")

esDataSet.printSchema()

esDataSet.limit(20).show()

esDataSet.createOrReplaceTempView("user")

spark.sql("select * from user where age > 30 ").show()

// 写入数据

esDataSet.filter("age > 30")

.write.options(options)

.format("org.elasticsearch.spark.sql")

.options(options).mode(SaveMode.Overwrite)

.save("user2")

spark.stop()

}

}

Java:

public class SparkOnESJava {

public static void main(String[] args) {

SparkSession spark = SparkSession

.builder()

.appName("sparksql")

.master("local[*]")

.getOrCreate();

Map options = new HashMap<>();

options.put("es.nodes","192.168.40.176");

options.put("es.port","9200");

options.put("pushdown","true");

options.put("es.nodes.wan.only","true");

options.put("es.update.retry.on.conflict","3");

options.put("es.mapping.date.rich","false");

options.put("es.index.auto.create","true");

options.put("es.input.max.docs.per.partition","5000000");

Dataset esDataSet = spark.read()

.format("org.elasticsearch.spark.sql")

.options(options)

.load("user");

esDataSet.printSchema();

esDataSet.limit(20).show();

esDataSet.createOrReplaceTempView("user");

spark.sql("select * from user where age > 30 ").show();

// 写入数据

esDataSet.filter("age > 30")

.write().options(options)

.format("org.elasticsearch.spark.sql")

.options(options).mode(SaveMode.Overwrite)

.save("user2");

spark.stop();

}

}

python

python 需要依赖于 elasticsearch-spark 的 jar 包,可以放在 Spark 的依赖中,或使用 spark-submit 时带上 –jars 指定该 jar包位置。

from pyspark.sql import SparkSession

import findspark

if __name__ == '__main__':

findspark.init()

# 声明 SparkSession

spark = SparkSession.builder.appName('sparksql').master("local[*]").getOrCreate()

options = {

"es.nodes": "192.168.40.176",

"es.port": "9200",

"pushdown": "true",

"es.nodes.wan.only": "true",

"es.update.retry.on.conflict": "3",

"es.mapping.date.rich": "false",

"es.index.auto.create": "true",

"es.input.max.docs.per.partition": "5000000"

}

esDataSet = spark.read \

.format("org.elasticsearch.spark.sql") \

.options(**options) \

.load("user")

esDataSet.printSchema()

esDataSet.limit(20).show()

esDataSet.createOrReplaceTempView("news")

spark.sql("select * from news where remark like '%贵州省%'").show()

# 写入数据

esDataSet.filter("age > 30").write \

.format("org.elasticsearch.spark.sql") \

.options(**options).mode("overwrite").save("user2")

spark.stop()

Schema 信息: 索引数据:

筛选后数据:

ES 中写入数据:

更多参数,可以参考官方的介绍:

https://www.elastic.co/guide/en/elasticsearch/hadoop/8.5/spark.html

查看原文