连接预排序和预分区过的数据
Map-side joins 是最有效的技术,前面的两种 map-side 策略都要求其中有一个数据集可被加载到内存。但是,如果两个数据集都很大且无法”瘦身”而无法做到这一点时,该怎么办?在这种情况下,如果满足以下条件,则可以使用复合的 map-side join:
两个数据集都无法整体加载到内存中.
两个数据集都按 join key 排好了序
每个数据集都有相同的文件数.
在每个数据集中的 File N 都包含相同的 join key K.
每个文件的大小都小于一个 HDFS block,这样分区时不...
Map-side joins 是最有效的技术,前面的两种 map-side 策略都要求其中有一个数据集可被加载到内存。但是,如果两个数据集都很大且无法”瘦身”而无法做到这一点时,该怎么办?在这种情况下,如果满足以下条件,则可以使用复合的 map-side join:
两个数据集都无法整体加载到内存中.
两个数据集都按 join key 排好了序
每个数据集都有相同的文件数.
在每个数据集中的 File N 都包含相同的 join key K.
每个文件的大小都小于一个 HDFS block,这样分区时不会被 split者,或者用于该数据的 input split不会切分该文件。
下图显示了一个排序的和分区的文件的例子,这些文件可以用于复合连接。
应用场景:
想要在排序的、分区的数据上执行一个 map-side join。解决方案:
使用 MapReduce 自带的 CompositeInputFormat。CompositeInputFormat 功能相当强大,并且支持内连接和外连接。
半连接(Semi-join)
假设一个场景,需要连接两个很大的数据集,例如,用户日志和 OLTP 的用户数据。任何一个数据集都不是足够小到可以缓存在 map 作业的内存中。可以思考以下问题:如果在数据集的连接操作中,一个数据集中有的记录由于因为无法连接到另一个数据集的记录,将会被移除。这样还需要将整个数据集放到内存中吗?
在这个例子中,在用户日志中的用户仅仅是 OLTP 用户数据中的用户中的很小的一部分。那么就可以从 OLTP 用户数据中只取出存在于用户日志中的那部分用户的用户数据。然后就可以得到足够小到可以放在内存中的数据集...
假设一个场景,需要连接两个很大的数据集,例如,用户日志和 OLTP 的用户数据。任何一个数据集都不是足够小到可以缓存在 map 作业的内存中。可以思考以下问题:如果在数据集的连接操作中,一个数据集中有的记录由于因为无法连接到另一个数据集的记录,将会被移除。这样还需要将整个数据集放到内存中吗?
在这个例子中,在用户日志中的用户仅仅是 OLTP 用户数据中的用户中的很小的一部分。那么就可以从 OLTP 用户数据中只取出存在于用户日志中的那部分用户的用户数据。然后就可以得到足够小到可以放在内存中的数据集。这种的解决方案就叫做半连接。
应用场景:
需要连接两个都很大的数据集,同时避免经过 shuffle 和 sort 阶段。解决方案:
在这个技术中,将会用到三个 MapReduce 作业来连接两个数据集,以此来减少 reduce 端连接的消耗。这个技术在这种场景下非常有用:连接两个很大的数据集,但是可以通过过滤与另一个数据集不匹配的记录来减少数据的大小,使得可以放入 task 的内存中。
下图说明了在半连接中将要执行的三个 MapReduce 作业(Job)。
[例]使用半连接。
准备数据集:
有两个数据集 logs.txt 和 users.txt。其中 users.txt 中为用户数据,包括用户名、年龄和所在地区;logs.txt为基于用户的一些活动(可从应用程序或 web 服务器日志中抽取出来),包括用户名、活动、源 IP 地址。
文件 users.txt:
文件 logs.txt:
JOB 1:
第一个 MapReduce job 的功能是从日志文件中提取出用户名,用这些用户名生成一个用户名唯一的集合(Set)。这通过在 map 函数执行用户名的投影操作来实现,并反过来使用 reducer 来产生这些用户名。为了减少在 map 阶段和 reduce 阶段之间传输的数据量,采用如下方法:在 map 任务中采用哈希集 HashSet来缓存所有的用户名,并在 cleanup 方法中输出该 HashSet 的值。下图说明了这个 job 的流程:
作业1的代码:
作业 1 的结果就是来自于日志文件中的所有用户的集合。集合中的用户名是唯一的。
Job2:
第二步是一个复杂的过滤 MapReduce job,目标是从全体用户的用户数据集中移除不存在于日志文件中的用户。这是一个 map-only job,它使用一个复制连接来缓存出现在日志文件中的用户名,并把他们和用户数据集进行连接。由于 job 1 输出的唯一用户的数据集实际上要远远小于整个用户数据集,所以很自然地就把来自 job 1 的唯一用户集放到缓存中了。下图说明了这个作业的流程:
这是一个复制连接,与上一节学习的复制连接一样。
Job 2 的 mapper 代码如下:(注意,要先上传 job1 的输出文件 part-r-00000 到分布式缓存)
作业 2 的输出就是已被用户日志数据集的用户名过滤过的用户集了。
Job 3:
在这最后一步中,我们将合并从 job 2 输出的过滤后的用户和原始的用户日志。现在被过滤后的用户已经小到可以驻留在内存中了,这样就可以将它们放入分布式缓存中。下图演示了这个 job 的流程:
小结:
这一节学习了怎样使用一个半连接(semi-join)来合并两个数据集。半连接结构包含比其他连接更多的步骤,但是当处理大数据集时(其中有一个数据集必须可被消减到适合放入内存的大小),使用半连接是很给力的方式。
Map 端连接(Map-side joins)
作为一般策略,首选是 map-side 连接。这里,我们探讨三种不同风格的 map-side joins:
有一个数据集小到足够放入内存缓存;
有一个数据集经过过滤后可以放入内存缓存中;(在两个数据集中都存在 join key);
数据被排序并以某种方式跨文件分发。
方式一:连接数据,其中有一个数据集小到足够放入内存缓存
复制连接是 map-side...
作为一般策略,首选是 map-side 连接。这里,我们探讨三种不同风格的 map-side joins:
有一个数据集小到足够放入内存缓存;
有一个数据集经过过滤后可以放入内存缓存中;(在两个数据集中都存在 join key);
数据被排序并以某种方式跨文件分发。
方式一:连接数据,其中有一个数据集小到足够放入内存缓存
复制连接是 map-side join,复制连接得名于它的具体实现:连接中最小的数据集将会被复制到所有的 map 主机节点。复制连接有一个假设前提:在被连接的数据集中,有一个数据集足够小到可以缓存在内存中。使用分布式缓存来缓存较小的那个数据集,当较大的那个数据集流向 mapper 时与其执行接。
MapReduce 复制连接工作原理如下:
使用分布式缓存(Districubted cache)将这个小数据集复制到所有运行 map 任务的节点。
用各个 map 任务初始化方法将这个小数据集装载到一个哈希表(hashtable)中。
使用输入到 map 函数的大数据集的每条记录的 key 来查找这个小数据集的哈希表,并在这个大数据集记录和匹配该连接值的小数据集的所有记录间执行一个连接。
输出符合连接条件的结果。
示例:实现复制连接(map-join)。
题目:
有两个数据集 mylogs_tsv.txt 和 ip_country_tsv.txt。其中 mylogs_tsv.txt 中数据为 web 网站日志记录,ip_country_tsv.txt 中数据为 ip 地址和国家代码映射信息。
按照以下步骤操作:
我们先写mapper类:
然后再写我们的job类:
country.txt:
mylogXX.txt:
进行集群提交:
输出结果格式:
说明:
- 结论:So you're supposed to just open the file, it will be there. No dedicated API.- 使用 Hadoop 2.x 的分布式缓存,需要在 hadoop 平台上运行 MR 程序
如果所有的输入数据集都不能够小到可以放到缓存中,那有没有办法来优化 map 端连接呢?那就需要使用半连接(semi-join)了。
这就是简单的Map-side join 需要资料加qq:86497564 标明乐乎
hadoop_mr_表连接
连接(Join)是关系运算,可以用于合并关系(relation)。对于数据库中的表连接操作,可能已经广为人知了。在 MapReduce 中,连接可以用于合并两个或多个数据集。例如,用户基本信息和用户活动详情信息。用户基本信息来自于 OLTP 数据库。用户活动详情信息来自于日志文件。
MapReduce 的连接操作可以用于以下场景:
用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异)。
当用户超过一定时间没有使用网站后,发邮件提醒他们。(这个一定时间的阈值是用户自己预定义的)
分析...
连接(Join)是关系运算,可以用于合并关系(relation)。对于数据库中的表连接操作,可能已经广为人知了。在 MapReduce 中,连接可以用于合并两个或多个数据集。例如,用户基本信息和用户活动详情信息。用户基本信息来自于 OLTP 数据库。用户活动详情信息来自于日志文件。
MapReduce 的连接操作可以用于以下场景:
用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异)。
当用户超过一定时间没有使用网站后,发邮件提醒他们。(这个一定时间的阈值是用户自己预定义的)
分析用户的浏览习惯。让系统可以基于这个分析提示用户有哪些网站特性还没有使用到。进而形成一个反馈循环。
所有这些场景都要求将多个数据集连接起来。
最常用的两个连接类型是内连接(inner join)和外连接(outer join)。如下图所示,内连接比较两个关系中所有的元组,判断是否满足连接条件,然后生成一个满足连接条件的结果集。与内连接相反的是,外连接并不需要两个关系的元组都满足连接条件。在连接条件不满足的时候,外连接可以将其中一方的数据保留在结果集中。
为了实现内连接和外连接,MapReduce 中有三种连接策略,如下所示。这三种连接策略有的在 map阶段,有的在 reduce 阶段。它们都针对 MapReduce 的排序-合并的架构进行了优化。
重分区连接(Repartition join)— reduce 端连接。使用场景:连接两 个或多个大型数据集。
复制连接(Replication join)— map 端连接。使用场景:待连接的数 据集中有一个数据集足够
半连接(Semi-join)— 另一个 map 端连接。使用场景:待连接的数据 集中有一个数据集非常
2,选择最佳连接策略
要选择连接数据的最优方法,我们这里使用数据驱动的决策树来选择最佳连接策略。
这个决策树可以总结以下三点:
如果其中有一个数据集小到足够放入到一个 mapper 的内存,则 map only 复制连接最有效。
如果两个数据集都很大,其中一个数据集可通过预过滤(与其它数据集数据不匹配的)元素而大大减少体积,则 semi-join(半连接)最合适。
如果不能对数据进行预处理,并且数据体积太大而不能被缓存—这意味着我们不得不在 reducer端执行 join 连接—需要使用重分区连接(repartition join)
不管应用哪种策略,我们在 join 中应该执行的最基本的活动是使用过滤和投影。
3,过滤和投影
使用过滤和投影减少处理的数据量,使用下推优化技术来改善数据管道。过滤和投影工作原理如下图所示:
应该尽可能地靠近数据源执行过滤和投影;在 MapReduce 中,最好是在 mapper 中执行这个工作。例如下面的代码执行过滤:
投影和谓词下推进一步将过滤推进到存储格式。这甚至更高效,特别是使用基于下推可以 skip 过记录或 blocks 的存储格式时。
下表列出了各种存储格式以及是否支持下推:
使用 HDFS Java API
HDFS 是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。
Java 程序使用 HDFS Java API 与 HDFS 交互。使用这个 API,我们可以从 Java 程序中使用存储在 HDFS 中的数据,以及使用其它非 Hadoop 计算框架来处理这些数据。
有时,也会遇见这种情况-想要从一个 MapReduce 应用程序中直接访问 HDFS。不过,如果是直接从一个 Map 或 Reduce 任务中写入或修...
HDFS 是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。
Java 程序使用 HDFS Java API 与 HDFS 交互。使用这个 API,我们可以从 Java 程序中使用存储在 HDFS 中的数据,以及使用其它非 Hadoop 计算框架来处理这些数据。
有时,也会遇见这种情况-想要从一个 MapReduce 应用程序中直接访问 HDFS。不过,如果是直接从一个 Map 或 Reduce 任务中写入或修改 HDFS 中的文件,请注意这违背了MapReduce 的 side-effect-free(无副作用)特性,这有可能导致数据一致性问题。
对分布在 HDFS 中的文件操作主要涉及一下几个类:
Configuration类:该类的对象封装了客户端或者服务器的配置。
FileSystem类:该类的对象代表一个文件系统对象,可以用该对象的一些方法来对文件进行操作。
FileSystem fs = FileSystem.get(conf); // 通过 FileSystem 类的静态方法 get 获得该对象
FSDataInputStream 和 FSDataOutputStream:这两个类是 HDFS 中的输入输出流。分别通过 FileSystem 的 open 方法和 create 方法获得。
Path:代表文件或文件对象。
下面这个示例演示了怎样通过 Java 程序使用 HDFS Java API 来执行在 HDFS 上的文件系统操作。
【示例】通过 Java 程序使用 HDFS Java API 操作 HDFS 文件系统。请按以下步骤执行:
1、在 Eclipse 中新创建一个名为 HDFSJavaAPIExample 的 project;
2、在项目的 src 目录下创建一个名为 HDFSJavaAPIDemo.java 的源文件,该 java 程序会在 HDFS 中创建一个新的文件,并在其中写入一些文本,然后从 HDFS 读取回这个文件。编辑代码如下:
HDFS 分布式文件系统的 JAVA-API 提供了丰富的访问接口。主要包括:目录的创建、列表、查询、删除和文件的创建(写入)、读取等。 下面这个示例演示了其中一些方法:
程序主类:
程序的HDFSUtil类:
hdfs_shell操作
1,ls
hadoop fs -ls /
列出 hdfs 文件系统根目录下的目录和文件
hadoop fs -ls -R /
列出 hdfs 文件系统所有的目录和文件
2.put
hadoop fs -put < local file > < hdfs file >...
1,ls
hadoop fs -ls /
列出 hdfs 文件系统根目录下的目录和文件
hadoop fs -ls -R /
列出 hdfs 文件系统所有的目录和文件
2.put
hadoop fs -put < local file > < hdfs file >
hdfs file 的父目录一定要存在,否则命令不会执行hadoop fs -put < local file or dir >...< hdfs dir >
hdfs dir 一定要存在,否则命令不会执行hadoop fs -put - < hdsf file>
从键盘读取输入到 hdfs file 中,按 Ctrl+D 结束输入,hdfs file 不能存在,否则命令不会执行
2.1.moveFromLocal
hadoop fs -moveFromLocal < local src > ... < hdfs dst >
与 put 相类似,命令执行后源文件 local src 被删除,也可以从从键盘读取输入到 hdfs file 中
2.2.copyFromLocal
hadoop fs -copyFromLocal < local src > ... < hdfs dst >
与 put 相类似,也可以从从键盘读取输入到 hdfs file 中
3.get
hadoop fs -get < hdfs file > < local file or dir>
local file 不能和 hdfs file 名字不能相同,否则会提示文件已存在,没有重名的文件会复制到本地
hadoop fs -get < hdfs file or dir > ... < local dir >
拷贝多个文件或目录到本地时,本地要为文件夹路径
注意:如果用户不是 root, local 路径要为用户文件夹下的路径,否则会出现权限问题,
3.2.copyToLocal
hadoop fs -copyToLocal < local src > ... < hdfs dst >
与 get 相类似
4.rm
hadoop fs -rm < hdfs file > ...
hadoop fs -rm -r < hdfs dir>...
每次可以删除多个文件或目录
5.mkdir
hadoop fs -mkdir < hdfs path>
只能一级一级的建目录,父目录不存在的话使用这个命令会报错
hadoop fs -mkdir -p < hdfs path>
所创建的目录如果父目录不存在就创建该父目录
6.getmerge
hadoop fs -getmerge < hdfs dir > < local file >
将 hdfs 指定目录下所有文件排序后合并到 local 指定的文件中,文件不存在时会自动创建,文件存在时会覆盖里面的内容
hadoop fs -getmerge -nl < hdfs dir > < local file >
加上 nl 后,合并到 local file 中的 hdfs 文件之间会空出一行
7.cp
hadoop fs -cp < hdfs file > < hdfs file >
目标文件不能存在,否则命令不能执行,相当于给文件重命名并保存,源文件还存在
hadoop fs -cp < hdfs file or dir >... < hdfs dir >
目标文件夹要存在,否则命令不能执行
8.mv
hadoop fs -mv < hdfs file > < hdfs file >
目标文件不能存在,否则命令不能执行,相当于给文件重命名并保存,源文件不存在
hadoop fs -mv < hdfs file or dir >... < hdfs dir >
源路径有多个时,目标路径必须为目录,且必须存在。
注意:跨文件系统的移动(local 到 hdfs 或者反过来)都是不允许的
9.count
hadoop fs -count < hdfs path >
统计 hdfs 对应路径下的目录个数,文件个数,文件总计大小
显示为目录个数,文件个数,文件总计大小,输入路径
10.du
hadoop fs -du < hdsf path>
显示 hdfs 对应路径下每个文件夹和文件的大小
hadoop fs -du -s < hdsf path>
显示 hdfs 对应路径下所有文件和的大小
hadoop fs -du - h < hdsf path>
显示 hdfs 对应路径下每个文件夹和文件的大小,文件的大小用方便阅读的形式表示,例如用 64M 代替 67108864
11.text
hadoop fs -text < hdsf file>
将文本文件或某些格式的非文本文件通过文本格式输出
12.setrep
hadoop fs -setrep -R 3 < hdfs path >
改变一个文件在 hdfs 中的副本个数,上述命令中数字 3 为所设置的副本个数,-R 选项可以对一个人目录下的所有目录+文件递归执行改变副本个数的操作
13.stat
hdoop fs -stat [format] < hdfs path >
返回对应路径的状态信息
[format]可选参数有:%b(文件大小),%o(Block 大小),%n(文件名),%r(副本个数),%y(最后一次修改日期和时间)
可以这样书写 hadoop fs -stat %b%o%n < hdfs path >,不过不建议,这样每个字符输出的结果不是太容易分清楚
14.tail
hadoop fs -tail < hdfs file >
在标准输出中显示文件末尾的 1KB 数据
15.archive
hadoop archive -archiveName name.har -p < hdfs parent dir > < src >* < hdfs dst >
命令中参数 name:压缩文件名,自己任意取;< hdfs parent dir > :压缩文件所在的父目录;<src >*:要压缩的文件名;< hdfs dst >:压缩文件存放路径
示例:hadoop archive -archiveName hadoop.har -p /user 1.txt 2.txt /des
示例中将 hdfs 中/user 目录下的文件 1.txt,2.txt 压缩成一个名叫 hadoop.har 的文件存放在 hdfs中/des 目录下,如果 1.txt,2.txt 不写就是将/user 目录下所有的目录和文件压缩成一个名叫hadoop.har 的文件存放在 hdfs 中/des 目录下
显示 har 的内容可以用如下命令:
hadoop fs -ls /des/hadoop.jar
显示 har 压缩的是那些文件可以用如下命令
hadoop fs -ls -R har:///des/hadoop.har
**注意:**har 文件不能进行二次压缩。如果想给.har 加文件,只能找到原来的文件,重新创建一个。
har 文件中原来文件的数据并没有变化,har 文件真正的作用是减少 NameNode 和 DataNode 过多的空间浪费
16.balancer
hdfs balancer
如果管理员发现某些 DataNode 保存数据过多,某些 DataNode 保存数据相对较少,可以使用上述命令手动启动内部的均衡过程
17.dfsadmin
hdfs dfsadmin -help
管理员可以通过 dfsadmin 管理 HDFS,用法可以通过上述命令查看
hdfs dfsadmin -report
显示文件系统的基本数据
hdfs dfsadmin -safemode < enter | leave | get | wait >
enter:进入安全模式;leave:离开安全模式;get:获知是否开启安全模式;
wait:等待离开安全模式
18.distcp
用来在两个 HDFS 之间拷贝数据
hadoop distcp hdfs://caozhan:9000/a hdfs://cao:9000/ca
命令行中还可以指定多个源目录:
hadoop distcp hdfs://caozhan:9000/a hdfs://caozhan:9000/a hdfs://cao:9000/ca
或者使用-f选项,从文件里获得多个源:
hadoop distcp -f hdfs://caozhan:9000/dirlist hdfs://caozhan:9000/d
其中dirlist下是:
hadoop_yarn内存调优
(1) yarn.nodemanager.resource.memory-mb
表示在该节点上yarn可使用的物理内存数量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要减小这个值,而yarn不会只能的检测节点的物理内存总量。
(2)yarn.nodemanager.vmem-pmem-ratio
任务每使用1mb物理内存,最多可使用虚拟内存,默认是2.1
(3)yarn.nodemanager.pmem-check-enabled...
(1) yarn.nodemanager.resource.memory-mb
表示在该节点上yarn可使用的物理内存数量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要减小这个值,而yarn不会只能的检测节点的物理内存总量。
(2)yarn.nodemanager.vmem-pmem-ratio
任务每使用1mb物理内存,最多可使用虚拟内存,默认是2.1
(3)yarn.nodemanager.pmem-check-enabled
是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
(4)yarn.nodemanager.vmem-check-enabled
是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true。
(5)yarn.scheduler.minimum-allocation-mb
单个任务可申请的最少物理内存量,默认是1024(MB),如果一个任务申请的物理内存量少于该值,则该对应的值改为这个数。
(6)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB).
(7)小总结:计算节点的内存占用量。
默认情况下,一个同时运行了 namenode,secondarynamenode 和 nodemanager 的主节点,各自使用1000M 内存,所以总计使用3000M。
默认情况下,一个从节点运行了如下守护进程:
1个 datanode:默认占用1000M内存.
1个 tasktracker:默认占用1000M内存.
最多2个map任务:2*200M=400M.
最多2个reduce任务:2*200M=400Ma
yarn中的调度
在yarn中有三种调度器可以选择:FIFO Scheduler,Capacity Scheduler,Fair Scheduler。
FIFO Scheduler 把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推. FIFO Scheduler 是最简单的也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其他应用被阻塞。
而对于Capacity调度器,有一个专门的队列用来运行小...
在yarn中有三种调度器可以选择:FIFO Scheduler,Capacity Scheduler,Fair Scheduler。
FIFO Scheduler 把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推. FIFO Scheduler 是最简单的也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其他应用被阻塞。
而对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO 调度器时的时间。
在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器回为所有运行的job动态的调整系统资源。如下图所示:当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一般资源给这个小任务,让这两个小任务公平的共享集群资源。需要注意的是,在下图fair调度器中,从第二个任务提交获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。
抢占当一个job提交到一个繁忙集群中的空队列时,job并不会马上执行,而是阻塞直到正在运行的job释放系统资源,为了使提交job的执行时间更具预测性(可以设置等待的超时时间),Fair调度器支持抢占,抢占就是允许调度器杀掉占用超过其应占份额资源队列的containers,这些cintainers资源便可被分配到应该享有这些份额资源的队列中,需要注意抢占会降低集群的执行效率,因为被终止的containers需要被重新执行。
Yarn的运行机制
(1) 客户端练习资源管理器,请求他运行一个application master。
(2) 资源管理器找到一个能够在容器中启动application master 的节点管理器。
(3)根据application master 自己来确定的,如果所需资源少或者代码给定为一个,那么就经过简单的计算将结果反馈给客户端:如果所需资源大,或者需要节点运算,那么就向资源管理器申请更多的容器。
(4)进行分布式计算。
当程序运行完成时,ApplicationMaster从Resourcemanager...
(1) 客户端练习资源管理器,请求他运行一个application master。
(2) 资源管理器找到一个能够在容器中启动application master 的节点管理器。
(3)根据application master 自己来确定的,如果所需资源少或者代码给定为一个,那么就经过简单的计算将结果反馈给客户端:如果所需资源大,或者需要节点运算,那么就向资源管理器申请更多的容器。
(4)进行分布式计算。
当程序运行完成时,ApplicationMaster从Resourcemanager注销其容器,执行周期就完成了。
hadoop通信机制
hadoop的通信机制就是rpc
rpc是“Remote Peocedure Call”即"远地址过程调用"的缩写,这个机制的目的,是让一台机器上的程序像调用本地的"过程"那样来调用别的机器上的某些过程。
对rpc机制的要求:从程序代码上看,过程的调用者就好像在调用本地函数一样,但是被调用过程的代码实际上在别的机器上,被调用的过程是在别的机器上执行,然后返回执行的结果,对调用者而言就像从本地的函数调用返回一样,在这个过程中,调用者(线程)发动调用之后,就会进入睡眠,...
hadoop的通信机制就是rpc
rpc是“Remote Peocedure Call”即"远地址过程调用"的缩写,这个机制的目的,是让一台机器上的程序像调用本地的"过程"那样来调用别的机器上的某些过程。
对rpc机制的要求:从程序代码上看,过程的调用者就好像在调用本地函数一样,但是被调用过程的代码实际上在别的机器上,被调用的过程是在别的机器上执行,然后返回执行的结果,对调用者而言就像从本地的函数调用返回一样,在这个过程中,调用者(线程)发动调用之后,就会进入睡眠,直至调用返回时才被唤醒。
心跳机制
Hadoop集群是master/slave模式,master包括NameNode和ResourceManager,slave包括DataNode和NodeManager。
master启动的时候,会开一个 rpc server那里,等待slave心跳。slave启动时,会连接master,并每隔3秒钟主动向master发送一个"心跳",这个时间可以通过"hearbeat.recheck.interval"属性来设置,将自己的状态信息告诉master,然后master...
Hadoop集群是master/slave模式,master包括NameNode和ResourceManager,slave包括DataNode和NodeManager。
master启动的时候,会开一个 rpc server那里,等待slave心跳。slave启动时,会连接master,并每隔3秒钟主动向master发送一个"心跳",这个时间可以通过"hearbeat.recheck.interval"属性来设置,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
需要指出的是,namenode与datanode之间的通信,ResourceManager与NodeManager之间的通信,都是通过"心跳"完成的。
(一)DataNode与NameNode之间的心跳:
DataNode上对于每个NameNode都有个BPServiceActor线程,这个线程会定期向其NameNode发送心跳报告。
在NameNode端,DataNode这一边的BPServiceActor线程通过BP和Rpc机制将心跳报文发送到NameNode这一边,然后就睡眠等待Namenode发回响应报文。
在NameNode端,NameNode这一边由NameNodeRpcServer负责接收和处理来自各个DataNode的心跳和报告做出反应,所以NameNode上的pb层接收到来自DataNode的心跳报文以后会调用NameNodeRpcserver的sendHearbeat()方法。
DataNode这一边sendHearbeat,NameNode这一边receiveHearbeat,但是由于rpc要求两边的函数名相同,所以Namenode这一边也是sendHearbeat。但事实上,Namenode不会发送心跳信号。
DataNode与Namenode的互动
DataNode与NameNode之间基本的通信就是rpc
(1)登记,dataNode一经启动就应该主动与namenode 建立rpc连接,并向其登记,让Namenode知道有这么一个DataNode已经在位了。
(2)向NameNode发送心跳信号并在这上面搭载各种报告,一来让它知道这个DataNode继续存在,二来让它知道这个DataNode上的存储发生了一些什么变化,特别是节点上有哪些数据块的副本,以及还有多少资源可供使用,这也总是由DataNode主动发起的。
(3)NameNode发回响应消息,由DataNode执行NameNode搭载在响应信息里的命令和要求。
DataNode与NameNode之间基本的通信就是rpc
(1)登记,dataNode一经启动就应该主动与namenode 建立rpc连接,并向其登记,让Namenode知道有这么一个DataNode已经在位了。
(2)向NameNode发送心跳信号并在这上面搭载各种报告,一来让它知道这个DataNode继续存在,二来让它知道这个DataNode上的存储发生了一些什么变化,特别是节点上有哪些数据块的副本,以及还有多少资源可供使用,这也总是由DataNode主动发起的。
(3)NameNode发回响应消息,由DataNode执行NameNode搭载在响应信息里的命令和要求。
安全模式
namenode启动时,首先将映像文件(fsiamge)载入内存,并重新编辑日志中的各项操作。一旦在内存中成功建立文件系统元数据的映像,则创建一个新的fsimage文件(该操作不需要借助namenode)和一个空的编辑日志。此时,namenode开始监听rpc和http请求。但是此刻namenode运行在安全模式,即namenode的文件系统对于客户端来说是只读的。(只有访问文件系统操作时肯定成功执行的,对于读文件操作,只有集群中当前的datanode上的块可用,才能工作。但文件修改操作,包括写、删或重命名均会失败)在安全模式下namenode并不向datanode发出任何块复制或删除的指令,...
namenode启动时,首先将映像文件(fsiamge)载入内存,并重新编辑日志中的各项操作。一旦在内存中成功建立文件系统元数据的映像,则创建一个新的fsimage文件(该操作不需要借助namenode)和一个空的编辑日志。此时,namenode开始监听rpc和http请求。但是此刻namenode运行在安全模式,即namenode的文件系统对于客户端来说是只读的。(只有访问文件系统操作时肯定成功执行的,对于读文件操作,只有集群中当前的datanode上的块可用,才能工作。但文件修改操作,包括写、删或重命名均会失败)在安全模式下namenode并不向datanode发出任何块复制或删除的指令,如果满足"最小副本条件"。namenode会在30秒钟后就退出安全模式,启动一个刚格式化的HDFS集群时,因系统中还没任何块,所以namenode不会进入安全模式。
安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。
安全模式命令:
hdfs dfsadmin -safemode get 显示是否处于安全模式
hdfs dfsadmin -safemode wait 一直等到某条命令到来之前才退出安全模式
hdfs dfsadmin -safemode enter 进入安全模式
hdfs dfsadmin -safemode leave 离开安全模式
安全模式配置(hdfs-site.xml):
<name>dfs.namenode.replication.min</name>
</value>1</value>
默认1,成功执行写操作所需创建的最小副本数(也成最小副本数级别)
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.999f</value>
默认值0.999,在namenode退出安全模式之前,系统中满足最小副本级别(由dfs.replication.min定义)的块的比例。这个值小等于0表示无须等待就可以退出安全模式;而如果这个值大于1表示永远处于安全模式。
Namenode 在重启的时候,DataNode需要向NameNode发送块的信息,Namenode只有获取到整个文件系统中有99.9%的块满足最小副本才会自动退出安全模式。假设我们设置的副本数(即参数dfs.replication)是5,那么在打他Node上就应该有5个副本存在,假设只存在3个副本,那么比率就是3/5=0.6。我们的副本率0.6明显小于0.999,因此系统会自动的复制副本到其他的datanode,争取是的最小副本率>=0.999。如果系统中有8个副本,超过我们设定的5个副本,那么系统也会删除多于的3个副本
<name>dfs.namenode.safemode.extension</name>
<value>3000</value>
默认值30000,在满足最小副本条件之后,namenode还需处于安全模式的时间,(以毫秒为单位),对于小型集群可设为0.
客户端将数据写入hdfs
(1)客户端通过对DistributeFileSystem 对象调用create()函数来创建文件。
(2)DistributedFileSystem 对namenode创建一个rpc调用,在文件系统的命名空间中创建一个新文件,但是此时该文件中还没有相应的数据块,namenode执行各个不同的检查以确保这个文件不存在,并且客户端有创建该文件的权限。如果这些检查均通过,namenode就会为创建新文件记录一条记录,否则,文件创建失败并向客户端抛出一个IoException异常。
(3)DistributeFileSystem向客户...
(1)客户端通过对DistributeFileSystem 对象调用create()函数来创建文件。
(2)DistributedFileSystem 对namenode创建一个rpc调用,在文件系统的命名空间中创建一个新文件,但是此时该文件中还没有相应的数据块,namenode执行各个不同的检查以确保这个文件不存在,并且客户端有创建该文件的权限。如果这些检查均通过,namenode就会为创建新文件记录一条记录,否则,文件创建失败并向客户端抛出一个IoException异常。
(3)DistributeFileSystem向客户端返回一个FSDataOutrputStream对象,由此客户端可以开始写入数据。就像读取事件一样,FSDataOutputStream封装一个DFSoutputstream对象,该对象负责处理datanode和namenode之间的通信。在客户端写入数据时,DFSoutputStream将它分成一个个的数据包,并写入内部队列,成为数据队列(data queue)
(4)dataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据备份。这一组datanode构成一个管道(pip管道)---我们假设副本数是3,所以管道中有3个节点。dataStream将数据包流式传输到管道中第一个datanode,该datanode存储数据包之后,并将它发送到管道中第二个datanode中,同样操作,第二个datanode 将数据包存储后给第三个datanode。
(5)DFSOoutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为"确认队列"(ack queue)。当收到管道中所有的datanode确认信息后,该数据包才会从确认队列删除。
如果在数据写入期间,datanode发生故障,则执行以下操作,这对与写入数据的客户端是透明的。首先关闭管道,确认把队列中的任何数据包都添回数据队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据包。为存储在另一个正常datanode的当前数据块制定一个新的标识,并将该标识传送给namenode。以便故障datanode在恢复后可以删除存储的部分数据块。从管道中删除故障数据节点并且把余下的数据块写入管道中剩余的两个正常的datanode。namenode注意到块副本量不足时,会在另一个节点上创建一个新的副本。后续的数据块继续正常接受处理。
(6)客户端完成数据的写入后,会对数据流调用close()方法。
(7)该操作将剩余的所有数据包写入datanode管道中,并在联系namenode发送文件写入完成信号之前,等待确认。namenode已经知道文件由哪些块组成(通过DataStreamer询问数据块的分配),所以它在返回成功💰只需要等待数据块进行最小量的复制。
客户端读取hdfs数据
(1)客户端通过调用FIleSystem对象的open()方法来打开希望读取的文件,对于hdfs来说,这个对象是分布式文件系统的一个实例。
(2)DistributedFileSystem通过使用rpc来调用namenode,以确定文件起始块的位置。对于每一个块,namenode返回存有该块副本的datanode的地址。此外,这些datanode根据他们与客户端的距离来排序(机架感应)。如果该客户端本身就是一个datanode(比如,...
(1)客户端通过调用FIleSystem对象的open()方法来打开希望读取的文件,对于hdfs来说,这个对象是分布式文件系统的一个实例。
(2)DistributedFileSystem通过使用rpc来调用namenode,以确定文件起始块的位置。对于每一个块,namenode返回存有该块副本的datanode的地址。此外,这些datanode根据他们与客户端的距离来排序(机架感应)。如果该客户端本身就是一个datanode(比如,在一个MapReduce任务中),并保存有相应数据块的一个副本时,该节点将从本地datanode中读取数据。
(3)DistributedFileSystem类返回一个FSDataInputStream对象(支持文件定位的输入流)给客户端读取数据。FSDataInput类转而封装DFSInputStream对象,该对象管理着datanode、namenode的I/O。接着,客户端对这个输入流调用read()方法。
(4)存储着文件起始块的datanode地址的DFSInputStream 随即连接距离最近的文件中第一个块所在的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端。
(5)到达块的末端时,DFSInputStream 会关闭与该datanode的连接,然后寻找下一个块的最佳datanode。客户端只需要读取连续的流,并且对于客户端都是透明的。客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它根据需要询问namenode来检索下一批所需块的datanode的位置。
(6)一旦客户端读取完成,就对FSDataInputStream调用close()方法,在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,它便尝试从这个块的另外一个最邻近的datanode读取数据。它也会记住哪个故障的datanode,以保证以后不会反复读取该节点上后续的块。DFSInputStream也会通过校验和确认从datanode发来的数据是否完整。如果发现一个损坏的块,它就会在DFSInputStream 试图从其他datanode读取一个块的副本,也会将被损坏的块通知给namonode。
这个设计的一个重点是,客户端可以直接连接到datanode检索数据,且namenode告诉客户端每个块中的最佳datanode。由于数据流分散在该集群中所有的datanode,所以这种设计能使HDFS可扩展到大量的并发客户端。同时,namenode仅需要响应快位置的请求(这些信息存储在内存中,因而非常高效),而无需响应数据请求,否则随着客户端数量的增长,namenode很快会成为一个瓶颈。