# spark

分布式计算引擎框架,基于 mapreduce 开发

单机:单进程,单节点

伪分布式:多进程,单节点

分布式:多进程,多节点

分布式计算核心:切分数据,减少数据规模

image-20240804223237028

spark 分布式集群采用集群中心化

image-20240804223253379

框架:不完整的计算机程序 (核心功能已经开发完毕,但是是和业务相关的代码未开发)(MR,spark)

系统:完整的计算机程序 (HDFS,Kafka)

image-20240804223306758

引擎:核心功能

spark 基于 mr 开发,两者区别

1. 开发语言:mr:java,不适合进行大量数据处理。spark:scala,适合大量数据处理,封装大量功能

2. 处理方式:hadoop 出现的早,只考虑单一的计算操作

image-20240804223319497

spark 优化了计算过程

image-20240804223331386

回顾:Hadoop 主要解决,海量数据的存储和海量数据的分析计算。

Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

image-20240804223347611

spark 内置模块

image-20240804223358385

# 部署 spark 集群

部署 Spark 其实指的就是 Spark 的程序逻辑在什么资源中执行

image-20240804223413816

如果资源是当前单节点提供的,那么就称之为单机模式

如果资源是当前多节点提供的,那么就称之为分布式模式

如果资源是由 Yarn 提供的,那么就称之为 Yarn 部署环境

如果资源是由 Spark 提供的,那么就称之为 Spark 部署环境 (Standalone

生产环境中主要采用:yarn+spark 也称之为(spark on yarna)

(1) Local 模式:在本地部署单个 Spark 服务

(2) Standalone 模式:Spark 自带的任务调度模式。(国内不常用)

(3) YARN 模式:Spark 使用 Hadoop 的 YARN 组件进行资源与任务调度。(国内最常用)

(4) Mesos 模式:Spark 使用 Mesos 平台进行资源与任务的调度。(国内很少用)

# 部署 local

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
wget https://dlcdn.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
tar xzvf spark-3.4.3-bin-hadoop3.tgz

bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ./examples/jars/spark-examples_2.12-3.4.3.jar 10

--master 指定资源提供者
local 单线程
local[2] 两个线程执行
local[*] 使用全部核

24/06/12 08:09:06 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.055199 s
Pi is roughly 3.1425071142507113
24/06/12 08:09:06 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/06/12 08:09:06 INFO SparkUI: Stopped Spark web UI at http://hadoop100:4040
24/06/12 08:09:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/06/12 08:09:06 INFO MemoryStore: MemoryStore cleared
24/06/12 08:09:06 INFO BlockManager: BlockManager stopped
24/06/12 08:09:06 INFO BlockManagerMaster: BlockManagerMaster stopped
24/06/12 08:09:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/06/12 08:09:06 INFO SparkContext: Successfully stopped SparkContext
24/06/12 08:09:06 INFO ShutdownHookManager: Shutdown hook called

image-20240804223435814

Spark 在运行时,会启动进程,申请资源,执行计算,但是一旦计算完毕,那么进程会停止,资源会释放掉

Stopped Spark web UI at http://hadoop100:4040

image-20240804223450585

# yarn 模式

编辑启动关闭脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/bin/bash
echo "========hadoop100=========="
ssh root@hadoop100 "/root/jdk8u352-b08/bin/jps"
echo "========hadoop101=========="
ssh root@hadoop101 "jps"
echo "========hadoop102=========="
ssh root@hadoop102 "jps"


#!/bin/bash
ssh root@hadoop100 "source /root/admin-openrc;/root/hadoop-3.3.6/sbin/start-dfs.sh"
ssh root@hadoop101 "source /root/admin-openrc;/root/hadoop-3.3.6/sbin/start-yarn.sh"
ssh root@hadoop100 "source /root/admin-openrc;/root/hadoop-3.3.6/bin/mapred --daemon start historyserver"


#!/bin/bash
ssh root@hadoop100 "source /root/admin-openrc;/root/hadoop-3.3.6/bin/mapred --daemon stop historyserver"
ssh root@hadoop101 "source /root/admin-openrc;/root/hadoop-3.3.6/sbin/stop-yarn.sh"
ssh root@hadoop100 "source /root/admin-openrc;/root/hadoop-3.3.6/sbin/stop-dfs.sh"
vim spark-env.sh
YARN_CONF_DIR=/root/hadoop-3.3.6/etc/hadoop/

hadoop-start.sh

bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn ./examples/jars/spark-examples_2.12-3.4.3.jar 10



root@hadoop100:~# jpsall
========hadoop100==========
42388 DataNode
42683 NodeManager
43820 Jps
42894 JobHistoryServer
43583 SparkSubmit
42191 NameNode
========hadoop101==========
37904 Jps
37169 NodeManager
36818 ResourceManager
36610 DataNode
37843 YarnCoarseGrainedExecutorBackend
37721 ExecutorLauncher
========hadoop102==========
37362 DataNode
37509 SecondaryNameNode
37626 NodeManager
38093 Jps

image-20240804223505654

配置历史服务

1
2
3
4
5
6
7
8
9
10
11
vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop100:8020/directory


vim spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop100:8020/directory -Dspark.history.retainedApplications=30"

vim spark-defaults.conf
spark.yarn.historyServer.address=hadoop100:18080
spark.history.ui.port=18080

hdfs 中创建 /directory

image-20240804223520765

1
2
3
4
5
6
sbin/start-history-server.sh

bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn ./examples/jars/spark-examples_2.12-3.4.3.jar 10


将spark历史记录保存到了hadoop history中

运行时,会将 yarn 需要用到的 lib 和 conf 上传到 hdfs 中

1
2
24/06/14 06:32:57 INFO Client: Uploading resource file:/tmp/spark-4dbf287c-9986-44f0-89c1-7ede052800c0/__spark_libs__5882056198812431005.zip -> hdfs://hadoop100:8020/user/root/.sparkStaging/application_1718182778487_0005/__spark_libs__5882056198812431005.zip
24/06/14 06:32:58 INFO Client: Uploading resource file:/tmp/spark-4dbf287c-9986-44f0-89c1-7ede052800c0/__spark_conf__3646583648306474590.zip -> hdfs://hadoop100:8020/user/root/.sparkStaging/application_1718182778487_0005/__spark_conf__.zip

运行结束会删除文件

1
2
24/06/14 06:33:07 INFO ShutdownHookManager: Deleting directory /tmp/spark-f3d8debb-7304-4251-b80b-11b1ab91f45c
24/06/14 06:33:07 INFO ShutdownHookManager: Deleting directory /tmp/spark-4dbf287c-9986-44f0-89c1-7ede052800c0

yarn 模式中有 client 和 cluster 模式,主要区别在于:Driver 程序的运行节点。

yarn-client:Driver 程序运行在客户端,适用于交互、调试,希望立即看到 Japp 的输出。

yarn-cluster:Driver 程序运行在由 ResourceManager 启动的 APPIMaster, 适用于生产环境。

image-20240804223534005

默认使用的客户端模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 使用cluster模式
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster ./examples/jars/spark-examples_2.12-3.4.3.jar 10

========hadoop100==========
42388 DataNode
51081 Jps
42683 NodeManager
44508 HistoryServer
42894 JobHistoryServer
50959 SparkSubmit
42191 NameNode
========hadoop101==========
37169 NodeManager
36818 ResourceManager
36610 DataNode
40596 Jps
========hadoop102==========
37362 DataNode
37509 SecondaryNameNode
42121 Jps
37626 NodeManager
41995 ApplicationMaster

# standalone 模式

Standalone 模式是 Spark 自带的资源调度引擎,构建一个由 Master+VVorker 构成的 Spark 集群,Spark 运行在集群中。

这个要和 Hadoop 中的 Standalone 区别开来。这里的 Standalone 是指只用 Spark 来搭建一个集群,不需要借助 Hadoop 的 Yarn 和 Mesos 等其他框架。

image-20240804223542959

# mesos 模式

Spark 客户端直接连接 Mesos; 不需要额外构建 Spark 集群。国内应用比较少,更多的是运用 Yarn 调度。

# 模式对比

image-20240804223557692

# 端口号

1) Spark 查看当前 Spark-shell 运行任务情况端口号:4040

2) Spark 历史服务器端口号:18080 (类比于 Hadoop 历史服务器端口号:19888)

# rdd

RDD: 分布式计算模型

1. 一定是一个对象

2. 一定封装了大量方法和属性

3. 一定需要适合进行分布式处理 (减小数据规模,并行计算算)

image-20240804223609055

# RDD 编程

在 Spark 中创建 RDD 的创建方式可以分为三种:从集合中创建] RDD、从外部存储创建 RDD、从其他 RDD 创建。

RDD 的处理方式和 JavalO 流完全一样,也采用装饰者设计式来实现功能的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>


package org.example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");

// 创建spark配置对象
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");

// 构建spark运行环境
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

// 构建spark运行环境
//JavaSparkContext javaSparkContext = new JavaSparkContext("local","spark");

// 释放资源
javaSparkContext.close();
}
}

# 对接内存数据构建 RDD 对象

parallelize 方法可以传递参数:集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.example;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");

// 创建spark配置对象
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");

// 构建spark运行环境
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

// 构建spark运行环境
//JavaSparkContext javaSparkContext = new JavaSparkContext("local","spark");

// 对接数据源
List<String> names = Arrays.asList("zhangsan", "lisi");

JavaRDD<String> rdd = javaSparkContext.parallelize(names);

List<String> collect = rdd.collect();

collect.forEach(System.out::println);
// 释放资源
javaSparkContext.close();
}
}

# 对接磁盘数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main1 {
public static void main(String[] args) {
System.out.println("Hello world!");

// 创建spark配置对象
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");

// 构建spark运行环境
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("C:\\Users\\Administrator\\IdeaProjects\\MapReduceDemo\\data\\text");

List<String> collect = stringJavaRDD.collect();
collect.forEach(System.out::println);

// 释放资源
javaSparkContext.close();
}
}

# 磁盘数据分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class partition {
public static void main(String[] args) {
System.out.println("Hello world!");

// 创建spark配置对象
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");

// 构建spark运行环境
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

// idea开发时相对路径默认以项目根路径为基准
//JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("C:\\Users\\Administrator\\IdeaProjects\\MapReduceDemo\\data\\text");
// textfile 第二个参数最小分区数,不传递的时候使用默认值
// 1.textFile可以传递第二个参数:minPartitions(最小分区数)
//参数可以不需要传递的,那么Spark会采用默认值
//minPartitions = math.min(defaultParallelism,2)
//2.使用配置参数:spark.default.parallelism=>4=> 4=> math.min(参数,2)
//3.采用环境默认总核值=>math.min(总核数,2)



JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("data\\text");


stringJavaRDD.saveAsTextFile("output1222");
// 释放资源
javaSparkContext.close();
}
}

# 内存数据源,分区数据分配

​ 数据分配方式 (i*length)/numSlices,(((i + 1) * 1) * length) /numSlices)

# 磁盘数据源

Spark 不支持文件操作的。文件操作都是由 Hadoop 完成的

Hadoop 进行文件切片数量的计算和文件数据存储计算规则不样

1. 分区数量计算的时候,考虑的是尽可能的平均:按字节来计算

2. 分区数据的存储是考虑业务数据的完整性:按照行来读取

读取数据时,还需要考虑数据偏移量,偏移量从 0 开始的。

读取数据时,相同的偏移量不能重复读取。

使用 spark 时,数据不能全放一行,会造成数据倾斜

# transformation 转换算子

# map

image-20240804223623531

1
2
3
4
5
6
7
8
9
10
11
12
13
Scala语言中可以将无关的数据封装在一起,形成一个整体,称之为元素的组合,简称为【元组】
如果想要访问元组中的数据,必须采用顺序号

var kv1 = ("haha",1,2)
JDK1.8以后也存在元组,采用特殊的类:TupleX

Tuple2<String,String> tuple2 = new Tuple2<>("abc", "1");


System.out.println(tuple2._1);
System.out.println(tuple2._2());
tuple中最大容量为22
使用时可以 ._1 也可以 ._1()
# 函数式编程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.example.rdd;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class operator3 {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

javaSparkContext.parallelize(Arrays.asList(1,2,3,4),2).map(NumberTest::mul2).collect().forEach(System.out::println);
}
}

class NumberTest {
public static int mul2(Integer num2) {
return num2 *= 2;
}
}

image-20240804223648424

RDD 不会保存数据,不会等每一个 rdd 执行完再执行下一个

# filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class filter {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<Integer> parallelize111 = javaSparkContext.parallelize(Arrays.asList(1, 2, 3),2);

// TODO filter 过滤 对数据源中数据进行筛选 满足保留,不满足丢弃
// 返回结果为true满足 ,返回false不满足
// JavaRDD<Integer> filter = parallelize111.filter(new Function<Integer, Boolean>() {
// @Override
// public Boolean call(Integer num) throws Exception {
// return true;
// }
// });
// filter.collect().forEach(System.out::println);

JavaRDD<Integer> filterrdd = parallelize111.filter(
num -> true
);
// JavaRDD<Integer> filterrdd111 = parallelize111.filter(
// num -> (num % 2 ==1)
// );

/// filter 执行过程中可能会造成数据倾斜
filterrdd.collect().forEach(System.out::println);
javaSparkContext.close();
}
}
# flatmap

数据扁平化,扁平映射(整体变为个体)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class flatmap {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<List<Integer>> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)), 2);

// JavaRDD<Integer> integerJavaRDD = parallelizerdd.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
// @Override
// public Iterator<Integer> call(List<Integer> integers) throws Exception {
// return integers.iterator();
// }
// }
// );

JavaRDD<Integer> integerJavaRDD = parallelizerdd.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
@Override
public Iterator<Integer> call(List<Integer> integers) throws Exception {
List<Integer> objectArrayList = new ArrayList<>();
integers.forEach(num -> objectArrayList.add(num * 2));
return objectArrayList.iterator();
}
}
);
integerJavaRDD.collect().forEach(System.out::println);
javaSparkContext.close();
}
}
data/text:
hadoop python
java php golang
B
V
D

public class flatmap {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD<List<Integer>> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)), 2);


JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("data/text");
JavaRDD<String> stringJavaRDD1 = stringJavaRDD.flatMap(
line -> Arrays.asList(line.split(" ")).iterator()
);
stringJavaRDD1.collect().forEach(System.out::println);
javaSparkContext.close();
}
}
# groupby
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class groupby {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD <Integer> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 4 ,6), 2);


// 按照指定的规则分组数据
// parallelizerdd.groupBy(new Function<Integer, Object>() {
//
// public Object call(Integer integers) throws Exception {
// // 返回值是数据对应组的名称,相同名称的数据防止在同一个组中
// if (integers % 2 == 0)
// {return "123";}
// else {return "456";}
// }
// }).collect().forEach(System.out::println);
//(123,[2, 4, 6])
//(456,[1])
parallelizerdd.groupBy(num -> num % 2 == 0).collect().forEach(System.out::println);
javaSparkContext.close();
}
# shuffle

默认情况下,数据处理后,所在的分区不会发生变化,但是 groupBy 方法例外

Spark 在数据处理中,要求同一个组的数据必须在同一个分区中

所以分组操作会将数据分区打乱重新组合,在 spark 中称为 shuffle

一个分区可以存放多个组,,所有数据必须分组后才能继续执行操作

RDD 对象不能保存数据,当前 groupBy 操作会将数据保存到磁盘文件中,保证数据全部分组后执行后续操作

image-20240804223703724

shuffle 操作一定会落盘

shuffle 操作有可能会导致资源浪费

Spark 中含有 shuffle 操作的方法都有改变分区的能力

RDD 的分区和 task 有关系

1
2
3
4
5
6
7
8
9
10
11
12
public class groupby2 {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD <Integer> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 4 ,6, 7, 8), 3);
parallelizerdd.groupBy(num -> num % 2 == 0,2).collect().forEach(System.out::println);
Thread.sleep(100000L);
javaSparkContext.close();
}
}

shuffle 会将完整的计算流程一分为二,其中一部分任务会写磁盘,另外一部分的任务会读磁盘

写磁盘的操作不完成,不允许读磁盘

# distinct

hashset 是单点去重

distinct 是分布式去重,采用分组 + shuffle 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class distinct {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD <Integer> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 4 ,6, 7, 8, 1, 2), 3);

parallelizerdd.distinct().collect().forEach(System.out::println);


//parallelizerdd.groupBy(num -> num % 2 == 0,2).collect().forEach(System.out::println);
//Thread.sleep(100000L);
javaSparkContext.close();
}
}
# sortby

按照指定规则排序

第一个参数表示排序规则

Spark 会为每一个数据增加一个标记,然后按照标记对数据进行排序

第二个参数表示排序的方式:升序 (true), 降序 (false)

第三个参数表示分区数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class sortby {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
JavaRDD <Integer> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(12, 52, 4 ,6, 7, 8, 1, 2), 3);
parallelizerdd.saveAsTextFile("sort222");

parallelizerdd.sortBy(new Function<Integer, Object>() {
@Override
public Object call(Integer integerssss) throws Exception {
return integerssss;
}
},true,2)
// .collect()
//.forEach(System.out::println);
.saveAsTextFile("sort333");

//parallelizerdd.groupBy(num -> num % 2 == 0,2).collect().forEach(System.out::println);
//Thread.sleep(100000L);
javaSparkContext.close();
}
}

return “”+integerssss; // 按照字典排序

# coalesce

缩减分区

coalesce 方法默认没有 shuffle 功能,所以数据不会被打击乱重新组合,所以如果要扩大分区是无法实现的

1
filterrdd.coalesce(3,true);    // shuffle 为true的时候可以扩大分区
# repartition

重分区,就是设定 shuffle=true 的 coalesce 方法

1
filterrdd.repartition(3)

# kv

Spark RDD 会整体数据的处理就称之为单值类型的数据处理

Spark RDD 会 KV 数据个体的处理就称之为 KV 类型的数据处理:K 和 V 不作为整体使用

mapValues 方法只对 V 进行处理,K 不做任何操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class kv {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
//JavaRDD <Integer> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 4 ,6, 7, 8, 1, 2), 3);

Tuple2<String, Integer> a = new Tuple2<>("a", 1);
Tuple2<String, Integer> b = new Tuple2<>("b", 2);

List<Tuple2<String, Integer>> list = Arrays.asList(a, b);

JavaPairRDD<String, Integer> parallelized = javaSparkContext.parallelizePairs(list);
parallelized.mapValues(num -> num*2).collect().forEach(System.out::println);

javaSparkContext.close();
}
}
public class kv2 {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
//JavaRDD <Integer> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 4 ,6, 7, 8, 1, 2), 3);

List<Integer> integerList = Arrays.asList(1, 2, 3, 4);

JavaRDD<Integer> integerJavaRDD = javaSparkContext.parallelize(integerList);

integerJavaRDD.mapToPair(
num -> new Tuple2<>(num, num*2)
).collect().forEach(System.out::println);

// integerJavaRDD.mapToPair(
// num -> new Tuple2<>(num, num*2)
// ).mapValues(num->num*2).collect().forEach(System.out::println);

javaSparkContext.close();
}
}
# mapvalue
# groupbykey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class kv3_groupbykey {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[*]");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);


// JavaRDD<Tuple2<String, Integer>> parallelized = javaSparkContext.parallelize(Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2), new Tuple2<>("a", 3), new Tuple2<>("b", 4)));
//
// JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> stringIterableJavaPairRDD = parallelized.groupBy(t -> t._1);
//
//
// stringIterableJavaPairRDD.collect().forEach(System.out::println);

// (a,[(a,1), (a,3)])
//(b,[(b,2), (b,4)])

javaSparkContext.parallelizePairs((Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("b", 2), new Tuple2<>("a", 3), new Tuple2<>("b", 4)))).groupByKey().collect().forEach(System.out::println);
javaSparkContext.close();

//(a,[1, 3])
//(b,[2, 4])
}
}

groupby 底层调用 greoupbykey

groupbykey 有 shuffle

# reducebykey

reduceByKey 方法的作用:将 KV 类型的数据按照 K 对 V 进行 reduce (将多个值聚合成 1 个值) 操作

基本思想:两两计算

1
JavaPairRDD<String ,Integer> wordcountrdd = parallelized.reduceByKey(Integer::sum);
# sortbykey

groupByKey : 按照 K 对 V 进行分组

reduceByKey 按照 K 对进行两两聚合

sortByKey 按照 K 排序

1
JavaPairRDD<String ,Integer> sortrdd = parallelized.sortByKey();

# RDD 行动算子

RDD 的行动算子会触发作业 (Job) 的执行

转换算子的目的:将旧的 RDD 转换成新的 RDD, 为了组合多个 RDD 的功能

返回值是 rdd,是转换算子。具体值是行动算子

collect 将 executor 执行的结果按照分区的数据拉取回到 driver,将结果组合成集合对象

1
2
3
4
JavaRDD<Integer> integerJavaRDD = parallelize111.map(num -> num * 2);
// collect 是行动算子
List<Integer> collect = integerJavaRDD.collect();
collect.forEach(System.out::println);

Spark 在编写代码时,调用转换算子,并不会真正执行,因为只是在 Driver 端组合功能

所以当前的代码其实就是在 Driver 端执行

所以当前 main 方法也称之为 driver 方法,当前运行 main 纟我程,也称之 Driver 线程

转换算子中的逻辑代码是在 Executor 端执行的。并不会在 tDriver 端调用和执行。

RDD 封装的逻辑其实就是转换算子中的逻辑

集合数据

image-20240804223718854

文件:读取切片

image-20240804223732201

collect 方法就是将 Executor 端执行的结果按照分区的顺序位取 (采集) 回到 Driver 端,将结果组合成集合对象

collect 方法可能会导致多个 Executor 的大量数据拉取到 Driiver 端,导致内存溢出,所以生成环境慎用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 JavaRDD<Integer> parallelize111 = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4),2);
JavaRDD<Integer> integerJavaRDD = parallelize111.map(num -> num * 2);
// collect 是行动算子,用于采集数据
List<Integer> collect = integerJavaRDD.collect();
collect.forEach(System.out::println);
// count获取结果数量
long count = integerJavaRDD.count();
// 获取结果的第一个
Integer first = integerJavaRDD.first();
// 从结果中获取前n个
List<Integer> take = integerJavaRDD.take(3);


// countbykey 将结果按照key计算数量
// {4=1, 2=1, 1=1, 3=1}
JavaPairRDD<Integer, Integer> integerIntegerJavaPairRDD = parallelize111.mapToPair(num -> new Tuple2<>(num, num));
Map<Integer, Long> integerLongMap = integerIntegerJavaPairRDD.countByKey();

// 保存
integerIntegerJavaPairRDD.saveAsTextFile("a");
// 保存对象
integerIntegerJavaPairRDD.saveAsObjectFile("bb");
// 单线循环
parallelize111.collect().forEach(System.out::println);

// 分布式循环,占内存比较小,执行效率低
parallelize111.foreach(
System.out::println
);
// 执行效率高,依托于内存大小
parallelize111.foreachPartition(System.out::println);

javaSparkContext.close();

main 方法也叫 driver 方法,foreach 是在 executor 执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class action_serialize {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaRDD<Integer> parallelize111 = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4), 2);

Student1 s = new Student1();

parallelize111.foreach(
num -> {
System.out.println(s.age + num);
}
);

javaSparkContext.close();
}
}


class Student1 implements Serializable {
public int age = 30;
}

Student 想 new ,得在 executor 上拉取

在 Executor 端循环遍历的时候使用到了 Driver 端对象

运行过程中,就需要将 Driver 端的对象通过网络传递到 Executor 端,否则无法使用

传输的对象必须要实现可序列化接口,否则无法传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
        JavaRDD<String> parallelize111 = javaSparkContext.parallelize(Arrays.asList("haha","Haha"), 2);
Search search = new Search("H");
search.match(parallelize111);

javaSparkContext.close();
}
}


class Search implements Serializable {
private String query; // query是成员变量,需要序列化

public Search(String query){
this.query = query;
}

public void match(JavaRDD<String> rdd) {
rdd.filter(
s -> s.startsWith(query)
).collect().forEach(System.out::println);
}
}


/*
class Search implements Serializable {
private String query; // query是成员变量,需要序列化

public Search(String query){
this.query = query;
}

public void match(JavaRDD<String> rdd) {
String q = this.query; // q是局部变量,在栈中,和search无关,不需要序列化
rdd.filter(
s -> s.startsWith(q)
).collect().forEach(System.out::println);
}
}

rdd 算子 (方法) 的逻辑代码是在 executo 执行的,其他的是在 driver 执行的

collect 是行动算子,没有逻辑代码

filter 中的成为逻辑代码

1
2
3
4
5
6
7
8
9
10
11
12
//        parallelize111.foreach(
// executor端执行
// num -> System.out.println(num)
// );

// jdk1.8 函数式编程采用对象模拟,使用这种方式会报错,但是系统的类无法改写继承序列化
parallelize111.foreach(
// 在driver端创建
System.out::println
// PrintStream out = System.out;
// out::println
);

# kryo

Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
        <dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.0.3</version>
</dependency>
import com.atguigu.bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;

public class Test02_Kryo {
public static void main(String[] args) throws ClassNotFoundException {

// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用kryo序列化的自定义类
.registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});

// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);

// 3. 编写代码
User zhangsan = new User("zhangsan", 13);
User lisi = new User("lisi", 13);

JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(zhangsan, lisi), 2);

JavaRDD<User> mapRDD = userJavaRDD.map(new Function<User, User>() {
@Override
public User call(User v1) throws Exception {
return new User(v1.getName(), v1.getAge() + 1);
}
});

mapRDD. collect().forEach(System.out::println);

// 4. 关闭sc
sc.stop();
}
}

# 依赖

RDD 转换算子 (方法):RDD 可以通过方法将旧的 RDD 转换成新的 RDD

RDD 依赖:Spark 中相邻的 2 个 RDD 之间存在的依赖关系

连续的依赖关系称为血缘关系

image-20240804223744879

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class flatmap {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local");
sparkConf.setAppName("spark");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
//JavaRDD<List<Integer>> parallelizerdd = javaSparkContext.parallelize(Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)), 2);

//System.out.println(parallelizerdd.toDebugString());

JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("data/text");
System.out.println(stringJavaRDD.toDebugString());
System.out.println("============");
JavaRDD<String> stringJavaRDD1 = stringJavaRDD.flatMap(
line -> Arrays.asList(line.split(" ")).iterator()
);
System.out.println(stringJavaRDD1.toDebugString());
stringJavaRDD1.collect().forEach(System.out::println);
javaSparkContext.close();
}
}


(1) data/text MapPartitionsRDD[1] at textFile at flatmap.java:20 []
| data/text HadoopRDD[0] at textFile at flatmap.java:20 []
============
(1) MapPartitionsRDD[2] at flatMap at flatmap.java:23 []
| data/text MapPartitionsRDD[1] at textFile at flatmap.java:20 []
| data/text HadoopRDD[0] at textFile at flatmap.java:20 []

shuffle +- 代表两段流程
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("data/text");
// System.out.println(stringJavaRDD.toDebugString());
System.out.println(stringJavaRDD.rdd().dependencies());
System.out.println("============");
JavaRDD<String> stringJavaRDD1 = stringJavaRDD.flatMap(
line -> Arrays.asList(line.split(" ")).iterator()
);
// System.out.println(stringJavaRDD1.toDebugString());
System.out.println(stringJavaRDD1.rdd().dependencies());

stringJavaRDD1.groupBy(num -> num);
System.out.println(stringJavaRDD1.rdd().dependencies());
stringJavaRDD1.collect().forEach(System.out::println);
javaSparkContext.close();


List(org.apache.spark.OneToOneDependency@4b5a078a)
============
List(org.apache.spark.OneToOneDependency@33f2df51)
24/06/25 17:16:52 INFO FileInputFormat: Total input files to process : 1
List(org.apache.spark.OneToOneDependency@33f2df51)

onetoonedep 窄依赖

shuffledep 宽依赖

rdd 的依赖关系是 rdd 对象中分区数据的关系

窄依赖:如果上游 rdd 一个分区的数据被下游 rdd 的一个分区独享

宽依赖:如果上游 rdd 一个分区的数据被下游 rdd 的多个分区共享。会将分区数据打乱重新组合,所以此层存在 shuffle 操作

依赖关系和任务数量,阶段数量

作业 (Job): 行动算子执行时,会触发作业的执行 (ActiveJob)

阶段 (Stage): 一个 job 中 RDD 的计算流程,默认就一个完整的阶段,但是如果计算流程中存在 shuffle, 那么流程就会一分为二。分开的每一段就称之为 stage,前一个阶段不执行完,后一个阶段不允许执行

任务 (Task): 每个 Executor 执行的计算单元

阶段的数量和 shuffle 依赖的数量有关系:1+shuffle 依赖的数量

任务(分区)的数量就是每个阶段分区的数量之和,一般推荐分区数量为资源核数的 2-3 倍

image-20240804223805444

任务 (Task): 每个 Executor 执行的计算单元

任务的数量其实就是每个阶段最后一个 RDD 分区的数量之和

移动数据不如移动计算

# 持久化

持久化:将对象长时间的保存

序列化:内存中对象 =>byte 序列 (byte 数组)

image-20240804223856875

maprdd.cache();

image-20240804223912457

maprdd.persist(Storage.MEMORY_ONLY());

cache 方法底层调用 persist. maprdd.persist (Storage.MEMORY_ONLY ()); === cache ()

// 落盘持久化

maprdd.persist(Storage.DISK_ONLY());

MEMORY_ONLY 超出数据直接丢弃

MEMORY_AND_DISK. 内存满了放磁盘

MEMORY_ONLY_SER 序列化后再存内存

MEMORY_ONLY_SER_2. 副本 2 份

# checkpoint

将计算结果保存到 hdfs 或者本地文件路径,实现不同进程共享

1
2
3
4
5
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.setCheckpointDir("cp");

mapRDD.cache();
mapRDD.checkPoint();

检查点目的是 rdd 结果长时间保存,所以需要保证数据安全,会从头再跑一遍。把第二遍结果放到里面

性能比较低,可以在检查点之前执行 cache,将数据缓存

cache 方法会在血缘关系中增加依赖关系

checkpoint 方法改变血缘关系

每个 shuffle 都自动带有缓存,为了提高 shuffle 算子的性能

如果重复调用相同规则的 shuffle 算子,第二个 shfulle 算子不会有相同 shuffle 操作

使用完缓存,可以使用 unpersist 释放缓存

# 分区器

数据分区的规则

计算后数据所在的分区是通过 Spark 的内部计算 (分区) 完成我。尽可能均衡一些(hash)

reducebykey 需要传递两个参数,第一个参数是数据分区的规则,第二个参数是数据聚合逻辑

第一个参数可以不用传递,使用时会使用默认分区规则。默认分区规则中使用 HashPartitioner

hashpartitioner 中有一个方法叫 getpartition,需要传递一个参数 key,返回一个值,表示分区编号,从 0 开始

得到一个分区编号,key.hashcode % partnum (hash 取余)

只有 kv 类型的有分区器

rdd.partitioner()

Spark 目前支持 Hash 分区、Range 分区和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区和 Reduce 的个数。

自定义分区器

1. 创建自定义类

2. 继承抽象类 Partitioner

3. 重写方法 (2)

4. 构建对象,在算子中使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Part extends Partitioner {
@Override
// 指定分区数量
public int numPartitions() {
return 3;
}

@Override
// 很久数据的key来获取数据存储的分区编号
public int getPartition(Object key) {
if("a".equals(key))
return 0;
else if ("b".equals(key)) {
return 1;
} else {
return 2;
}
}
}

mapRDD.reduceByKey(new Part(),Integer::sum)
class Part extends Partitioner {
private int numPart;

public Part(int num) {
this.numPart = num;
}
@Override
// 指定分区数量
public int numPartitions() {
return 3;
}

@Override
// 很久数据的key来获取数据存储的分区编号
public int getPartition(Object key) {
if("a".equals(key))
return 0;
else if ("b".equals(key)) {
return 1;
} else {
return 2;
}
}

@Override
public int hashCode() {
return numPart;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Part) {
Part other = (Part)obj;
return this.numPart == other.numPart;
} else {
return false;
}
}
}

RDD 在 foreach 循环时,逻辑代码和操作全部都是在 Executor 端完成的,那么结果不会拉取回到 Driver 端

RDD 无法实现数据拉取操作

如果 Executor 端使用了 Driver 端数据,那么需要从 Driver 端将数据拉取到 Executor 端

数据拉取的单位是 Task (任务)

默认数据传输以 Task 为单位进行传输,如果想要以 Executor 为单位传输,那么需要进行包装 (封装)

Spark 需要采用特殊的数据模型实现数据传输:广播变量

jsc.broadcast(list);

rdd.filter(s -> broadcast.value())

# sparksql

Spark SQL: 结构化数据处理模块

SQL: 为了数据库数据访问开发的语言

Spark 封装模块的目的就是在结构化数据的场合,处理起来方便

结构化数据:特殊结构的数据 =>(table,json)

半结构化数据:xml,html

非结构化数据:压缩文件,图形文件,视频,音频文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>


public class envv {
public static void main(String[] args) {
// SparkConf sparkConf = new SparkConf();
// sparkConf.setMaster("local");
// sparkConf.setAppName("SparkSQL");
//
// SparkContext sc = new SparkContext(sparkConf);
// SparkSession sparkSession = new SparkSession(sc);
//
// sparkSession.close();
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();
sparkSession.close();
}
}
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");
//rowDataset.rdd();

// 将数据模型转换成表
rowDataset.createOrReplaceTempView("user");

// 使用sql文的方式操作
String sql = "select avg(age) from user";
Dataset<Row> rowDataset1 = sparkSession.sql(sql);

// 展示数据模型效果
rowDataset1.show();

sparkSession.close();
}
# 环境之间转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// core:sparkcontext -> sql:sparksession
new sparkSession( new SparkContect(conf));

// sql -> core:sparkcontext
sparksession.sparkContext().parallelize();

//sql -> core:javasparkcontext
SparkContext sparkContext = sparkSession.sparkContext();
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
javaSparkContext.parallelize(Arrays.asList(1,2,3,4));
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");
//rowDataset.rdd();

//rowDataset.foreach(
// row ->{
// System.out.println(row.getInt(2));
// }
//);

// 数据模型中的数据类型进行转换,将row转换成其他对象处理
Dataset<User> userDataset = rowDataset.as(Encoders.bean(User.class));

userDataset.foreach(
user -> {
System.out.println(user.getname());
}
);
sparkSession.close();
}
}

class User implements Serializable {
private int id;
private int age;
private String name;


User(int id, int age, String name) {
this.id = id;
this.age = age;
this.name = name;
}


}
public class model {
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");


rowDataset.createOrReplaceTempView("user");

sparkSession.sql("select * from user").show();

sparkSession.close();
}
}
public class model2 {
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");


rowDataset.select("*").show();

sparkSession.close();
}
}
# 自定义 UDF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class sql_3 {
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");
//rowDataset.rdd();

// 将数据模型转换成表
rowDataset.createOrReplaceTempView("user");

// 使用sql文的方式操作
// SparkSQL提供了一种特殊的方式,可以在SQL中增加自定义方法来实现复杂的逻辑
//如果想要自定义的方法能够在SQL中使用,那么必须在SPark中进行声明和注册
// register方法需要传递3个参数
//第一个参数表示SQL中使用的方法名
//第二个参数表示逻辑:IN=>OUT
//第三个参数表示返回的数据类型 ,DataType类型数据,需要使用scala语法操作
sparkSession.udf().register("prefixName", new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
return "Name:" + s;
}
}, StringType$.MODULE$);

String sql = "select prefixName(name) from user";
Dataset<Row> rowDataset1 = sparkSession.sql(sql);

// 展示数据模型效果
rowDataset1.show();

sparkSession.close();
}
}
import static org.apache.spark.sql.types.DataTypes.StringType;

public class sql_udf {
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");
//rowDataset.rdd();

// 将数据模型转换成表
rowDataset.createOrReplaceTempView("user");

// 使用sql文的方式操作
// SparkSQL提供了一种特殊的方式,可以在SQL中增加自定义方法来实现复杂的逻辑
//如果想要自定义的方法能够在SQL中使用,那么必须在SPark中进行声明和注册
// register方法需要传递3个参数
//第一个参数表示SQL中使用的方法名
//第二个参数表示逻辑:IN=>OUT
//第三个参数表示返回的数据类型 ,DataType类型数据,需要使用scala语法操作
// sparkSession.udf().register("prefixName", new UDF1<String, String>() {
// @Override
// public String call(String s) throws Exception {
// return "Name:" + s;
// }
// }, DataTypes.StringType);

sparkSession.udf().register("prefixName", new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
return "Name:" + s;
}
}, StringType);

String sql = "select prefixName(name) from user";
Dataset<Row> rowDataset1 = sparkSession.sql(sql);

// 展示数据模型效果
rowDataset1.show();

sparkSession.close();
}
}

UDF 函数是每一行数据会都用一次函数

UDAF 函数是所有的数据产生一个结果数据

image-20240804223942980

image-20240804224006080

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package org.example.sparksql;

import java.io.Serializable;

public class AvgAgeBuffer implements Serializable {
private long total;
private long count;

AvgAgeBuffer() {
}
AvgAgeBuffer(long total, long count) {
this.total = total;
this.count = count;
}

public long getCount() {
return count;
}

public void setCount(long count) {
this.count = count;
}

public long getTotal() {
return total;
}

public void setTotal(long total) {
this.total = total;
}
}




package org.example.sparksql;

import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;

// 自定义UDAF函数,实现年龄的平均值
//1.创建自定义的【公共】类
//2.继承 org.apache.spark.sql.expressions.Aggreegator
//3.设定泛型
//IN:输入数据类型
//BUFF:缓冲区的数据类型
//OUT:输出数据类型
//4.重写方法(4(计算)+2(状态))

public class MyAvgAgeUDAF extends Aggregator<Long, AvgAgeBuffer, Long> {

@Override
// 缓冲区初始化操作
public AvgAgeBuffer zero() {
return new AvgAgeBuffer(0L, 0L);
}

@Override
// 把输入年龄和缓冲器数据聚合
public AvgAgeBuffer reduce(AvgAgeBuffer buffer, Long age) {
buffer.setTotal(buffer.getTotal() + age);
buffer.setCount(buffer.getCount()+1);
return buffer;
}

@Override
// 合并缓冲区数据
public AvgAgeBuffer merge(AvgAgeBuffer b1, AvgAgeBuffer b2) {
b1.setTotal(b1.getTotal()+b2.getTotal());
b1.setCount(b1.getCount()+b2.getCount());
return b1;
}

@Override
// 计算最终结果
public Long finish(AvgAgeBuffer reduction) {

return reduction.getTotal() / reduction.getCount();
}

@Override
public Encoder<AvgAgeBuffer> bufferEncoder() {
return Encoders.bean(AvgAgeBuffer.class);
}

@Override
public Encoder<Long> outputEncoder() {
return Encoders.LONG();
}
}





package org.example.sparksql;

import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;

import java.io.Serializable;

public class sql_udaf {
public static void main(String[] args) {
// Spark在结构化数据的处理场景中对核心功能,环境进行了封装
// 构建SparkSQL的环境对象时,一般采用构建器模式
// 构建器模式:构建对象
final SparkSession sparkSession = SparkSession.builder().master("local").appName("sparkSQL").getOrCreate();

// Spark SQL中对数据模型也进行了封装:RDD ->Dataset
// 对接文件数据源时,会讲文件中一行数据封装为row对象
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");
//rowDataset.rdd();

// 将数据模型转换成表
rowDataset.createOrReplaceTempView("user");


//SparkSQL采用特殊的方式将UDAF转换成UDF使用
//UDAF使用时需要创建自定义聚合对象
// 两个恶参数,第一个UADF对象,第二个表示UADF对象
sparkSession.udf().register("avgage", functions.udaf(new MyAvgAgeUDAF(),Encoders.LONG()));

String sql = "select avgage(age) from user";
sparkSession.sql(sql).show();

// 展示数据模型效果
// rowDataset1.show();

sparkSession.close();
}
}
# 数据加载与保存

SparkSQL 读取和保存的文件一般为三种,JSON 文件、CSV 文文件和列式存储的文件,同时可以通过添加参数,来识别不同的存储和压缩格式。

csv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class sql_source {
public static void main(String[] args) {
final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL").getOrCreate();

// csv 文件讲数据采用逗号分隔,可以被excel打开
// csv 带有header,以,分隔. 可以以, _ \t 分隔 \t时叫做tsv
Dataset<Row> csv = sparkSession.read().option("header","true").option("sep",",").csv("data/user.csv");

// +----+--------+---+
//| _c0| _c1|_c2|
//+----+--------+---+
//|1001|zhangsan| 30|
//|1002| lisi| 31|
//|1003| wangwu| 32|
//+----+--------+---+

//+----+--------+---+
//| id| name|age|
//+----+--------+---+
//|1001|zhangsan| 30|
//|1002| lisi| 31|
//|1003| wangwu| 32|
//+----+--------+---+
//csv.show();

// 输出目录已经存在,默认会发生异常。不希望出错,可以修改配置 保存模式
csv.write().mode("append").option("header","true").csv("output");

// DataFrameWriter var2;
// if ("overwrite".equals(var4)) {
// var2 = this.mode(SaveMode.Overwrite);
// } else if ("append".equals(var4)) {
// var2 = this.mode(SaveMode.Append);
// } else if ("ignore".equals(var4)) {
// var2 = this.mode(SaveMode.Ignore);
// } else {
// boolean var3;
// if ("error".equals(var4)) {
// var3 = true;
// } else if ("errorifexists".equals(var4)) {
// var3 = true;
// } else if ("default".equals(var4)) {
// var3 = true;
// } else {
// var3 = false;
// }

sparkSession.close();
}
}

json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class sql_source_json {
public static void main(String[] args) {
final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL").getOrCreate();


// js object natation
// 如果是对象,用{} 数组用[] json文件符合json格式
//SparkSQL其实就是对Spark Core RDD的封装。RDD读取文件采用用的是Hadoop,hadoop是按行读取。
//SparkSQL只需要保证JSON文件中一行数据符合JSON格式即可,无需整个文件符合JSON格式
// 底层是dataset,可以读csv,写json
Dataset<Row> rowDataset = sparkSession.read().json("data/user.json");

//rowDataset.show();

rowDataset.write().json("output");
sparkSession.close();
}
}

行式存储

存在主键,可以快速定位。查询快,统计慢

列式存储

查询快,统计快

mysql 行存储,hive 列存储

spark 列存储 parquet

列存储可以被压缩,snappy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ublic class sql_source_parquet {
public static void main(String[] args) {
final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL").getOrCreate();


// js object natation
// 如果是对象,用{} 数组用[] json文件符合json格式
//SparkSQL其实就是对Spark Core RDD的封装。RDD读取文件采用用的是Hadoop,hadoop是按行读取。
//SparkSQL只需要保证JSON文件中一行数据符合JSON格式即可,无需整个文件符合JSON格式
// 底层是dataset,可以读csv,写json
Dataset<Row> rowDataset = sparkSession.read().parquet("data/user");

//rowDataset.show();

rowDataset.write().json("output");
sparkSession.close();
}
}
# mysql 交互
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
        <dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>


public class sql_source_mysql {
public static void main(String[] args) {
final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("sparkSQL").getOrCreate();


Properties properties = new Properties();
properties.setProperty("user","admin");
properties.setProperty("password","Chaitin@123");

Dataset<Row> jdbc = sparkSession.read().jdbc("jdbc:mysql://hadoop100:3306/metastore?useSSL=false","TYPES",properties);


jdbc.write().mode("append").jdbc("jdbc:mysql://hadoop100:3306/metastore?useSSL=false","TYPES_123",properties);
jdbc.show();
sparkSession.close();
}
}
# hive 交互

复制 hive-site.yaml 到 resources 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
        <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.1</version>
</dependency>



public class sql_hive {
public static void main(String[] args) {

// 编码前设定hadoop访问用户
System.setProperty("HADOOP_USER_NAME","root");
final SparkSession sparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("sparkSQL")
.getOrCreate();

sparkSession.sql("show tables").show();

sparkSession.sql("create table user_info(name string, age int)");
sparkSession.sql("insert into table user_info values ('haha',100)");
sparkSession.sql("select * from user_info").show();


sparkSession.close();
}
}

如果 reources 中有 hive-site.xml 但是 target/classes 中没有,需要手工拷贝到改目录

image-20240804224020783

# spark streaming

有界数据流

无界数据流

image-20240804224044892

image-20240804224058079

spark streaming 底层还是 spark core,在流式数据处理中进行的封装

从数据处理方式的角度

流式数据处理:一个数据一个数据的处理

微批量数据处理:一小批数据处理

批量数据处理:一批数据一批数据的处理

从数据处理延迟的角度

实时数据处理:数据处理的延迟以毫秒为单位

准实时处理:以秒和分钟为单位

离线数据处理:数据处理的延迟以小时,天为单位

Spark 是批量,离线数据处理框架

spark streaming 是个 微批量 准实时数据处理框架

streaming 按照时间来定义一小批

image-20240804224112151

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class env {
public static void main(String[] args) throws InterruptedException {

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming");
// Spark在流式数据的处理场景中对核心功能环境进行了封装
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(3000));

// 启动数据采集器
javaStreamingContext.start();

// 等待数据采集器的结束,如果采集器停止运行,那么main线程会继续续执行
javaStreamingContext.awaitTermination();

// 数据采集器是长期执行的任务,不能停止,也不能释放资源
// javaStreamingContext.close();
}
}

socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
python -m http.server 8001

public class socket {
public static void main(String[] args) throws InterruptedException {

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming");
// Spark在流式数据的处理场景中对核心功能环境进行了封装
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(3000));

// 通过环境对象对接socket数据源
JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("localhost", 9999);

socketTextStream.print();


// 启动数据采集器
javaStreamingContext.start();

// 等待数据采集器的结束,如果采集器停止运行,那么main线程会继续续执行
javaStreamingContext.awaitTermination();

}
}

kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.*;

public class kafka {
public static void main(String[] args) throws InterruptedException {

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming");
// Spark在流式数据的处理场景中对核心功能环境进行了封装
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(3000));


// 创建配置参数
HashMap<String, Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
map.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu");
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

// 需要消费的主题
ArrayList<String> strings = new ArrayList<>();
strings.add("topic_db");

JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferBrokers(), ConsumerStrategies.<String, String>Subscribe(strings,map));

directStream.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> v1) throws Exception {
return v1.value();
}
}).print(100);


// 启动数据采集器
javaStreamingContext.start();

// 等待数据采集器的结束,如果采集器停止运行,那么main线程会继续续执行
javaStreamingContext.awaitTermination();

}
}

Dstream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class function {
public static void main(String[] args) throws InterruptedException {

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming");
// Spark在流式数据的处理场景中对核心功能环境进行了封装
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(3000));

// 通过环境对象对接socket数据源
JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("localhost", 9999);

socketTextStream.print();

JavaDStream<String> stringJavaDStream = socketTextStream.flatMap(
line -> Arrays.asList(line.split(" ")).iterator()
);

JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(
word -> new Tuple2<>(word, 1)
);

JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(
Integer::sum
);

stringIntegerJavaPairDStream1.foreachRDD(
rdd -> {
rdd.sortByKey().collect().forEach(System.out::println);
}
);

// 启动数据采集器
javaStreamingContext.start();

// 等待数据采集器的结束,如果采集器停止运行,那么main线程会继续续执行
javaStreamingContext.awaitTermination();

}
}

window

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
窗口是可以移动的,成为移动窗口,但是窗口移动是有复幅度的,默认移动幅度就是采集周期


窗口:其实就是数据的范围(时间)
window方法可以改变窗口的数据范围(默认数据范围为采集周期
window方法可以传递2个参数
第一个参数表示窗口的数据范围(时间)
第二个参数表示窗口的移动幅度(时间),可以不用传递,默认使用的就是采集周期


数据窗口范围和窗口移动幅度一致(3s),数据不会有重复

数据窗口范围比窗口移动幅度大,数据可能会有重复

数据窗口范围比窗口移动幅度小,数据可能会有遗漏

close

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class close {
public static void main(String[] args) throws InterruptedException {

SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming");
// Spark在流式数据的处理场景中对核心功能环境进行了封装
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(3000));

// 通过环境对象对接socket数据源
JavaReceiverInputDStream<String> socketTextStream = javaStreamingContext.socketTextStream("localhost", 9999);

socketTextStream.print();


// 启动数据采集器
javaStreamingContext.start();

// 不能在main中关闭
new Thread(new Runnable() {
@Override
public void run() {
try {
// 关闭SparkStreaming的时候,需要在程序运行的过程中,通过外部操作进行关闭
Thread.sleep(000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
///javaStreamingContext.close(); // 强制关闭
javaStreamingContext.stop(true,true); //graceful stop
}
}).start();

// 等待数据采集器的结束,如果采集器停止运行,那么main线程会继续续执行
javaStreamingContext.awaitTermination();

}
}
new Thread(new Runnable() {
@Override
public void run() {
try {
// 关闭SparkStreaming的时候,需要在程序运行的过程中,通过外部操作进行关闭
Thread.sleep(3000);
// 使用zk,redis,mysql,hdfs实现中转
boolean flag = false;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
///javaStreamingContext.close(); // 强制关闭
javaStreamingContext.stop(true,true); //graceful stop
}
}).start();

// 等待数据采集器的结束,如果采集器停止运行,那么main线程会继续续执行
javaStreamingContext.awaitTermination();

}
}
public void run() {
try {
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), new Configuration(), "atguigu");
while (true){
Thread.sleep(5000);
boolean exists = fs.exists(new Path("hdfs://hadoop102:8020/stopSpark"));
if (exists){
StreamingContextState state = javaStreamingContext.getState();
// 获取当前任务是否正在运行
if (state == StreamingContextState.ACTIVE){
// 优雅关闭
javaStreamingContext.stop(true, true);
System.exit(0);
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
Edited on

Give me a cup of [coffee]~( ̄▽ ̄)~*

John Doe WeChat Pay

WeChat Pay

John Doe Alipay

Alipay

John Doe PayPal

PayPal