INTRO

Apache Hadoop Distributed File System (HDFS) as the underlying file system and Apache Spark as the execution engine.

cd ~
wget https://archive.apache.org/dist/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
tar zvxf hadoop-3.3.4.tar.gz
cd hadoop-3.3.4

我们需要编辑一些配置文件。对于HDFS,您可以在vm1上进行编辑,然后将文件复制到vm2。这些配置文件最初是空的(带有注释),因此用户需要手动设置它们。添加如下内容,替换空的<configuration>…</configuration>字段在hadoop-3.3.4/etc/hadoop/core-site.xml:

接下来,编辑hadoop-3.3.4/etc/hadoop/hdfs-site.xml中的文件系统目录。确保创建了文件夹并指定了正确的路径。例如,创建“hadoop-3.3.4/data/namenode/”和“hadoop-3.3.4/data/datanode”,分别设置为“dfs.namenode.name.dir”和“dfs.datanode.data.dir”字段值所在的路径(参考下面的模板hdfs-site.xml配置文件)。第三个配置属性表示HDFS块的大小,在本例中设置为64MB。(HDFS默认配置的块大小为128MB。)这些目录分别指明NameNode和DataNode的数据存储位置。注意,xml文件中的路径应该是绝对的;还需要注意的是,相同的路径需要您自己在领导实例vm1和跟随实例vm2上手动创建,因为HDFS将在集群的每台机器上运行一个DataNode进程。

Note that the path in the xml file should be absolute; also note that the same path needs to be manually created by yourself on both the leader instance vm1 and the follower instance vm2, because HDFS will run a DataNode process on each machine of the cluster.

在这一部分,我们详细了解了 PageRank 算法的概念和计算方式,它是 Google 搜索引擎用来衡量网页质量和重要性的关键算法之一。PageRank 的基本思想是利用网络上的链接结构来评估网页的“重要性”或“权威性”,而这个重要性可以用来提升搜索结果的精准度。以下是对第 2.1 节的具体说明:

2.1 PageRank:为网络带来秩序

网络上的引用(链接)图是一个重要的资源,但在早期的网络搜索引擎中几乎没有被充分利用。Google 开发了一种称为 PageRank 的算法,来通过计算网页的“引用重要性”来组织搜索结果。这种引用重要性与人们的主观重要性评价相吻合,因此可以用来很好地对搜索结果进行排序。

2.1.1 PageRank 计算的描述

PageRank 源于学术论文引用的理论。通常我们会统计指向某个页面的引用(即反向链接)数量来近似衡量页面的“重要性”或“质量”。然而,PageRank 不仅仅是简单地计算链接数量,而是通过加权来衡量。

[云] big data analytics stacks-LMLPHP

该公式表示:页面 A 的 PageRank 是其引用页面的 PageRank 值的归一化和,再加上一个常数。PageRank 值的总和构成了一个概率分布,因此所有页面的 PageRank 总和为 1。

计算方法:

PageRank 可以通过简单的迭代算法计算,最终收敛到网页的链接矩阵的主特征向量。计算约 2600 万网页的 PageRank 只需几小时。

2.1.2 直观的解释

PageRank 模型可以理解为用户行为的一种模拟。假设有一个“随机漫游者”,他会随机选择一个页面,不停地点击链接,不会点击“返回”,但在某个页面会感到“厌倦”并随机跳到另一个页面。这种随机跳转的概率就是阻尼系数 ddd。

PageRank 的重要特性在于,即使一个页面只有少数高质量页面指向它,或有很多普通页面指向它,它也可以得到较高的 PageRank。这表明,一个被广泛引用的页面是值得关注的,而即使只有一个重要页面(如 Yahoo 的主页)指向的页面也是值得关注的。

PageRank 算法通过递归地将权重在网络结构中传播,从而综合考虑了各种情况,包括广泛引用的页面和少数高质量引用的页面。这种算法使得搜索引擎可以避免人为操纵,提高搜索结果的客观性和可靠性。

你的任务包括以下三个主要部分:

任务 1:编写 PySpark 程序实现 PageRank 算法

  1. 初始化页面排名:给每个页面的初始排名设为 1。
  2. 迭代过程
    • 每一轮迭代中,每个页面 ppp 会把它的排名值 rank(p)\text{rank}(p)rank(p) 按照出链接数均分,贡献给每个出链接指向的页面。
    • 每个页面的排名更新公式是:rank(p)=0.15+0.85×(contributions sum)\text{rank}(p) = 0.15 + 0.85 \times (\text{contributions sum})rank(p)=0.15+0.85×(contributions sum)。
  3. 执行 10 轮迭代:按照上述步骤迭代计算 PageRank 值,共执行 10 次迭代。

具体步骤:

  • 下载并解压缩数据集(如从 BlackBoard 下载),然后将解压后的文件上传到 HDFS。
  • 编写 PySpark 代码来读取该数据集,初始化每个页面的排名,然后按上述逻辑实现 PageRank 算法。

任务 2:调整 RDD 分区以提高并行度

在 Spark 中,数据集会被分割成多个小块(称为“分区”),分布在集群的不同节点上。你可以通过多种方式来调整分区,以实现更高的并行度:

  • 可以在创建 RDD 时配置分区方式。
  • 还可以在进行像 join 这样会导致数据洗牌的操作时调整分区。

你可以使用 partitionBy() 函数自定义 RDD 的分区方式,并观察改变分区对程序执行效率的影响。

任务 3:杀掉一个 Worker 进程并观察变化

为了模拟节点故障并观察 Spark 的容错性,你需要在应用执行到 25% 到 75% 的时间段时中途故意杀掉一个 Worker 进程,具体步骤如下:

  1. vm2 上执行以下命令,清空内存缓存:
  2. sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"

  3. 使用 jps 命令获取 Spark Worker 的进程 ID,然后使用 kill -9 <Worker_PID> 终止该进程。

这个实验会帮助你理解 Spark 的容错机制,以及在分布式计算中遇到节点故障时 Spark 如何自动重新调度任务。

总结:

  • 任务 1:在 PySpark 中实现 PageRank 算法。
  • 任务 2:通过自定义分区来提高并行度。
  • 任务 3:杀掉一个 Worker 进程,观察容错效果。
注意点?

如果你的当前设置只有一个 Spark Worker,那么 Task 3 可能无法完成原本的故障模拟任务。Task 3 的目的是模拟 Worker 故障,以便观察 Spark 集群在 Worker 出现故障后的行为。这通常需要至少两个 Worker 才能测试系统的故障恢复能力。

你可以尝试在 spark-env.sh 中增加 SPARK_WORKER_INSTANCES 或配置更多的 VM,来添加一个新的 Worker。这样就可以在任务执行中关闭一个 Worker,并观察集群的故障处理过程。如果需要帮助增加一个 Worker 或设置 SPARK_WORKER_INSTANCES

scp D:\Broswer\web-BerkStan.gz ubuntu@44.198.192.38:~/

文件上传到服务器后,你可以:

  • 使用 gunzip 解压缩该文件:

  • gunzip ~/web-BerkStan.gz
    

    将解压后的文件上传到 HDFS,以便在 Spark 应用中进行处理:

hdfs dfs -copyFromLocal ~/web-BerkStan /web-BerkStan
ubuntu@ip-172-31-12-45:~$ hdfs dfs -copyFromLocal ~/web-BerkStan /web-BerkStan 
copyFromLocal: Call From ip-172-31-12-45.ec2.internal/172.31.12.45 to ip-172-31-12-45.ec2.internal:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
ubuntu@ip-172-31-12-45:~$ jps
1724 Jps
ubuntu@ip-172-31-12-45:~$ 

需要先启动 NameNode 和 DataNode 服务:

start-dfs.sh

动成功后,再次尝试将文件上传到 HDFS: 

在成功上传数据集后,您可以开始在 Spark 中实现 PageRank 算法。以下是实现步骤:

1. 编写 PageRank 程序

vm1 上,创建一个 Python 脚本文件(例如 pagerank.py),内容如下:

vm1 上创建一个 Python 脚本文件,可以使用以下步骤:

  1. 使用 touch 命令创建空文件: 在终端中执行以下命令创建一个名为 pagerank.py 的空文件:

    touch pagerank.py
    
  2. 使用 VS Code 或其他文本编辑器编辑文件: 如果您在使用 VS Code,可以直接通过 VS Code 打开 pagerank.py 文件进行编辑。确保您的 VS Code 已连接到 vm1

  3. 或者使用 nano 编辑器在终端中编辑: 在终端中使用 nano 编辑器打开文件

为了完成 Part 3 的 Task 2,您需要在代码中实现自定义的 RDD 分区,以观察它对计算性能和资源分配的影响。自定义 RDD 分区可以让您更好地控制数据在集群中的分布,进而提高并行处理的效率,尤其是在大数据集和集群环境中。

什么是 RDD 分区?

在 Spark 中,RDD(弹性分布式数据集)是分区的。每个分区可以在不同的 Worker 上并行处理,分区数越多,可以让集群中更多的 Worker 来并行处理数据。

通过自定义 RDD 分区,您可以控制数据的分布方式,进而影响集群资源的利用情况。这样,Spark 会在集群中根据自定义分区来平衡任务,从而让每个 Worker 都得到合理的任务量。

增加一个新的 Worker 确实可以将任务分配到该 Worker 上,从而提升并行处理能力,特别是当数据集较大且分区数合理时。通过增加 Worker 数量,Spark 会根据数据的分区将任务自动调度到所有可用的 Worker 上,以充分利用集群资源。

确保新 Worker 能参与任务的步骤

  1. 确保 RDD 分区数足够

    • 在代码中使用 partitionBy(num_partitions) 设置足够的分区数(通常设置为 Worker 数量的 2-4 倍,具体数值可根据集群性能调整)。
    • 分区数越多,越能让 Spark 把任务分配到更多的 Worker 上。
  2. 重新提交任务

     
    • 确保所有 Worker 都正常启动后,重新提交 Spark 程序:
      spark-3.3.1-bin-hadoop3/bin/spark-submit pagerank.py
      
11-10 01:33