INTRO
Apache Hadoop Distributed File System (HDFS) as the underlying file system and Apache Spark as the execution engine.
cd ~
wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
tar zvxf hadoop-3.3.4.tar.gz
cd hadoop-3.3.4
我们需要编辑一些配置文件。对于HDFS,您可以在vm1上进行编辑,然后将文件复制到vm2。这些配置文件最初是空的(带有注释),因此用户需要手动设置它们。添加如下内容,替换空的<configuration>…</configuration>字段在hadoop-3.3.4/etc/hadoop/core-site.xml:
接下来,编辑hadoop-3.3.4/etc/hadoop/hdfs-site.xml中的文件系统目录。确保创建了文件夹并指定了正确的路径。例如,创建“hadoop-3.3.4/data/namenode/”和“hadoop-3.3.4/data/datanode”,分别设置为“dfs.namenode.name.dir”和“dfs.datanode.data.dir”字段值所在的路径(参考下面的模板hdfs-site.xml配置文件)。第三个配置属性表示HDFS块的大小,在本例中设置为64MB。(HDFS默认配置的块大小为128MB。)这些目录分别指明NameNode和DataNode的数据存储位置。注意,xml文件中的路径应该是绝对的;还需要注意的是,相同的路径需要您自己在领导实例vm1和跟随实例vm2上手动创建,因为HDFS将在集群的每台机器上运行一个DataNode进程。
Note that the path in the xml file should be absolute; also note that the same path needs to be manually created by yourself on both the leader instance vm1
and the follower instance vm2
, because HDFS will run a DataNode process on each machine of the cluster.
在这一部分,我们详细了解了 PageRank 算法的概念和计算方式,它是 Google 搜索引擎用来衡量网页质量和重要性的关键算法之一。PageRank 的基本思想是利用网络上的链接结构来评估网页的“重要性”或“权威性”,而这个重要性可以用来提升搜索结果的精准度。以下是对第 2.1 节的具体说明:
2.1 PageRank:为网络带来秩序
网络上的引用(链接)图是一个重要的资源,但在早期的网络搜索引擎中几乎没有被充分利用。Google 开发了一种称为 PageRank 的算法,来通过计算网页的“引用重要性”来组织搜索结果。这种引用重要性与人们的主观重要性评价相吻合,因此可以用来很好地对搜索结果进行排序。
2.1.1 PageRank 计算的描述
PageRank 源于学术论文引用的理论。通常我们会统计指向某个页面的引用(即反向链接)数量来近似衡量页面的“重要性”或“质量”。然而,PageRank 不仅仅是简单地计算链接数量,而是通过加权来衡量。
该公式表示:页面 A 的 PageRank 是其引用页面的 PageRank 值的归一化和,再加上一个常数。PageRank 值的总和构成了一个概率分布,因此所有页面的 PageRank 总和为 1。
计算方法:
PageRank 可以通过简单的迭代算法计算,最终收敛到网页的链接矩阵的主特征向量。计算约 2600 万网页的 PageRank 只需几小时。
2.1.2 直观的解释
PageRank 模型可以理解为用户行为的一种模拟。假设有一个“随机漫游者”,他会随机选择一个页面,不停地点击链接,不会点击“返回”,但在某个页面会感到“厌倦”并随机跳到另一个页面。这种随机跳转的概率就是阻尼系数 ddd。
PageRank 的重要特性在于,即使一个页面只有少数高质量页面指向它,或有很多普通页面指向它,它也可以得到较高的 PageRank。这表明,一个被广泛引用的页面是值得关注的,而即使只有一个重要页面(如 Yahoo 的主页)指向的页面也是值得关注的。
PageRank 算法通过递归地将权重在网络结构中传播,从而综合考虑了各种情况,包括广泛引用的页面和少数高质量引用的页面。这种算法使得搜索引擎可以避免人为操纵,提高搜索结果的客观性和可靠性。
你的任务包括以下三个主要部分:
任务 1:编写 PySpark 程序实现 PageRank 算法
- 初始化页面排名:给每个页面的初始排名设为 1。
- 迭代过程:
- 每一轮迭代中,每个页面 ppp 会把它的排名值 rank(p)\text{rank}(p)rank(p) 按照出链接数均分,贡献给每个出链接指向的页面。
- 每个页面的排名更新公式是:rank(p)=0.15+0.85×(contributions sum)\text{rank}(p) = 0.15 + 0.85 \times (\text{contributions sum})rank(p)=0.15+0.85×(contributions sum)。
- 执行 10 轮迭代:按照上述步骤迭代计算 PageRank 值,共执行 10 次迭代。
具体步骤:
- 下载并解压缩数据集(如从 BlackBoard 下载),然后将解压后的文件上传到 HDFS。
- 编写 PySpark 代码来读取该数据集,初始化每个页面的排名,然后按上述逻辑实现 PageRank 算法。
任务 2:调整 RDD 分区以提高并行度
在 Spark 中,数据集会被分割成多个小块(称为“分区”),分布在集群的不同节点上。你可以通过多种方式来调整分区,以实现更高的并行度:
- 可以在创建 RDD 时配置分区方式。
- 还可以在进行像 join 这样会导致数据洗牌的操作时调整分区。
你可以使用 partitionBy()
函数自定义 RDD 的分区方式,并观察改变分区对程序执行效率的影响。
任务 3:杀掉一个 Worker 进程并观察变化
为了模拟节点故障并观察 Spark 的容错性,你需要在应用执行到 25% 到 75% 的时间段时中途故意杀掉一个 Worker 进程,具体步骤如下:
- 在 vm2 上执行以下命令,清空内存缓存:
-
sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"
- 使用
jps
命令获取 Spark Worker 的进程 ID,然后使用kill -9 <Worker_PID>
终止该进程。
这个实验会帮助你理解 Spark 的容错机制,以及在分布式计算中遇到节点故障时 Spark 如何自动重新调度任务。
总结:
- 任务 1:在 PySpark 中实现 PageRank 算法。
- 任务 2:通过自定义分区来提高并行度。
- 任务 3:杀掉一个 Worker 进程,观察容错效果。
注意点?
如果你的当前设置只有一个 Spark Worker,那么 Task 3 可能无法完成原本的故障模拟任务。Task 3 的目的是模拟 Worker 故障,以便观察 Spark 集群在 Worker 出现故障后的行为。这通常需要至少两个 Worker 才能测试系统的故障恢复能力。
你可以尝试在 spark-env.sh
中增加 SPARK_WORKER_INSTANCES
或配置更多的 VM,来添加一个新的 Worker。这样就可以在任务执行中关闭一个 Worker,并观察集群的故障处理过程。如果需要帮助增加一个 Worker 或设置 SPARK_WORKER_INSTANCES
scp D:\Broswer\web-BerkStan.gz ubuntu@44.198.192.38:~/
文件上传到服务器后,你可以:
-
使用
gunzip
解压缩该文件: -
gunzip ~/web-BerkStan.gz
将解压后的文件上传到 HDFS,以便在 Spark 应用中进行处理:
hdfs dfs -copyFromLocal ~/web-BerkStan /web-BerkStan
ubuntu@ip-172-31-12-45:~$ hdfs dfs -copyFromLocal ~/web-BerkStan /web-BerkStan
copyFromLocal: Call From ip-172-31-12-45.ec2.internal/172.31.12.45 to ip-172-31-12-45.ec2.internal:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
ubuntu@ip-172-31-12-45:~$ jps
1724 Jps
ubuntu@ip-172-31-12-45:~$
需要先启动 NameNode 和 DataNode 服务:
start-dfs.sh
动成功后,再次尝试将文件上传到 HDFS:
在成功上传数据集后,您可以开始在 Spark 中实现 PageRank 算法。以下是实现步骤:
1. 编写 PageRank 程序
在 vm1
上,创建一个 Python 脚本文件(例如 pagerank.py
),内容如下:
在 vm1
上创建一个 Python 脚本文件,可以使用以下步骤:
-
使用
touch
命令创建空文件: 在终端中执行以下命令创建一个名为pagerank.py
的空文件:touch pagerank.py
-
使用 VS Code 或其他文本编辑器编辑文件: 如果您在使用 VS Code,可以直接通过 VS Code 打开
pagerank.py
文件进行编辑。确保您的 VS Code 已连接到vm1
。 -
或者使用
nano
编辑器在终端中编辑: 在终端中使用nano
编辑器打开文件
为了完成 Part 3 的 Task 2,您需要在代码中实现自定义的 RDD 分区,以观察它对计算性能和资源分配的影响。自定义 RDD 分区可以让您更好地控制数据在集群中的分布,进而提高并行处理的效率,尤其是在大数据集和集群环境中。
什么是 RDD 分区?
在 Spark 中,RDD(弹性分布式数据集)是分区的。每个分区可以在不同的 Worker 上并行处理,分区数越多,可以让集群中更多的 Worker 来并行处理数据。
通过自定义 RDD 分区,您可以控制数据的分布方式,进而影响集群资源的利用情况。这样,Spark 会在集群中根据自定义分区来平衡任务,从而让每个 Worker 都得到合理的任务量。
增加一个新的 Worker 确实可以将任务分配到该 Worker 上,从而提升并行处理能力,特别是当数据集较大且分区数合理时。通过增加 Worker 数量,Spark 会根据数据的分区将任务自动调度到所有可用的 Worker 上,以充分利用集群资源。
确保新 Worker 能参与任务的步骤
-
确保 RDD 分区数足够:
- 在代码中使用
partitionBy(num_partitions)
设置足够的分区数(通常设置为 Worker 数量的 2-4 倍,具体数值可根据集群性能调整)。 - 分区数越多,越能让 Spark 把任务分配到更多的 Worker 上。
- 在代码中使用
-
重新提交任务:
- 确保所有 Worker 都正常启动后,重新提交 Spark 程序:
spark-3.3.1-bin-hadoop3/bin/spark-submit pagerank.py
- 确保所有 Worker 都正常启动后,重新提交 Spark 程序: