带你入坑大数据(二)---HDFS的读写流程和一些重要策略

## 前言 ### 前情回顾 如果说上一篇是在阐述HDFS最基础的理论知识,这一篇就是HDFS的主要工作流程,和一些较为有用的策略 补充一个问题,就是当我们 NameNode 挂掉,SecondaryNameNode作为新的NameNode上位时,它确实可以根据fsimage.ckpt把一部分元数据加载到内存,可是如果这时还有一部分操作日志在edits new中没有执行怎么办? 这时候有一个解决方案就是利用一个network fileSystem来解决,比如说集群中有一个服务器安装了一个nfs server,而在NameNode上再安装一个nfs client,此时客户端向HDFS写数据时,同时把向edits new中写的数据写一份到nfs server中,SecondaryNamenode就可以通过这个nfs server来获取此时断层的数据了 其他似乎也没啥可多说的,让我们直奔主题吧

目前创新互联已为近千家的企业提供了网站建设、域名、网络空间、网站托管维护、企业网站设计、平川网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

以往链接 [从零开始的大数据(一) --- HDFS的知识概述(上)] ## 一、HDFS的读流程 之后的内容会围绕下图开始 ![](https://user-gold-cdn.xitu.io/2019/11/9/16e4d9327833aa83?w=854&h=487&f=png&s=80924)

1.认识角色 简单过一下图里面的角色,大块的是一个client node,也就是说,这个节点上运行着客户端,如果实在是没搞清楚哪个是客户端,那也很简单,平时没事就执行 hadoop fs -ls / 这个命令的机器,那就是客户端了,其他就是NameNode和DataNode,在client node上运行着一个JVM虚拟机,让HDFS client跑起来

2.步骤分析

① HDFS client调用文件系统的open方法 Distributed FileSystem顾名思义是一个分布式文件系统,它会通过RPC的方式远程过程调用**NameNode里的open方法**,这个open方法有什么作用呢,就是获取要读的文件的**file block locations**,也就是文件的block的位置,在上一讲我们也已经提到了,一个文件是会分割成128M一块的大小分别存储在各个数据节点的。 同时在执行open方法时,客户端会产生一个FSData InputStream的一个输入流对象(客户端读数据是从外部读回来的)

② FSData InputStream读数据 HDFS client调用FSData InputStream的read方法,同上也是远程过程**调用DataNode的read方法**,此时的读取顺序是由近到远,就是DataNode和client node的距离,这里所指的距离是一种物理距离,判定可以参考上一篇文章中机架的概念。 在联系上DataNode并成功读取后,关闭流就走完了一个正常的流程。 而且补充一下就是,上面Distributed FileSystem所调用的get block locations的方法只会返回部分数据块,get block locations会分批次地返回block块的位置信息。读block块理论上来说是依次读,当然也可以通过多线程的方式实现同步读。

③ 容错机制

1.如果client从DataNode上读取block时网络中断了如何解决? 此时我们会找到block另外的副本(一个block块有3个副本,上一篇已经说过了),并且通过FSData InputStream进行记录,以后就不再从中断的副本上读了。

2.如果一个DataNode挂掉了怎么办? 在上一篇中我们提到了一个HDFS的心跳机制,DataNode会隔一小时向NameNode汇报blockReport,比如现在的情况是,block1的三个副本分别存储在DataNode1,2,3上,此时DataNode1挂掉了。NameNode得知某个block还剩2个副本,此时携带这block的其余两个副本的DataNode2,3在向NameNode报告时,NameNode就会对它们中的某一个返回一个指令,把block1复制一份给其他正常的节点。让block1恢复成原本的3个副本。

3.client如何保证读取数据的完整性 因为从DataNode上读数据是通过网络来读取的,这说明会存在读取过来的数据是不完整的或者是错误的情况。 DataNode上存储的不仅仅是数据,数据还附带着一个叫做checkSum检验和(CRC32算法)的概念,针对于任何大小的数据块计算CRC32的值都是32位4个字节大小。此时我们的FSData InputStream向DataNode读数据时,会将与这份数据对应的checkSum也一并读取过来,此时FSData InputStream再对它读过来的数据做一个checkSum,把它与读过来的checkSum做一个对比,如果不一致,就重新从另外的DataNode上再次读取。

4.上一个问题完成后工作 FSData InputStream会告诉NameNode,这个DataNode上的这个block有问题了,NameNode收到消息后就会再通过心跳机制通知这个DataNode删除它的block块,然后再用类似2的做法,让正常的DataNode去copy一份正常的block数据给其它节点,保证副本数为3 代码简单示例(可跳过) try { // String srcFile = "hdfs://node-01:9000/data/hdfs01.mp4"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(srcFile),conf); FSDataInputStream hdfsInStream = fs.open(new Path(srcFile)); BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream("/home/node1/hdfs02.mp4")); IOUtils.copyBytes(hdfsInStream, outputStream, 4096, true); } catch (IOException e) { e.printStackTrace(); } ## 二、HDFS写流程 写流程我们会按照下图来进行讲解,比读数据更加复杂一丢丢,角色基本没有改变所以就不详细介绍了 ![](https://user-gold-cdn.xitu.io/2019/11/12/16e5bfcaf5528b5d?w=864&h=587&f=png&s=125456) 客户端向HDFS写数据的时候是把文件分块存储在HDFS的各个节点上,而规定了存储位置的是NameNode,所以Client node在存储文件时需要先和NameNode进行联系让它进行分配。

步骤分析

① 客户端调用分布式文件系统的create方法 和上面读的方法类似,不过这次调用的是Distributed FileSystem的create方法,此时也是通过远程调用NameNode的create方法 此时NameNode会进行的举措 *

1.检测自己是否正常运行

2.判断要创建的文件是否存在

3.

client是否有创建文件的权限

4.

对HDFS做状态的更改需要在edits log写日志记录

② 客户端调用输出流的write方法 create方法的返回值是一个OutputStream对象,为什么是output,因为是由HDFS去往DataNode去写数据,此时HDFS会调用这个OutputStream的write方法 但是有个问题,此时我们还不知道我们的这些block块要分别存放于哪些节点上,所以此时FSData OutputStream就要再和NameNode交互一下,远程过程调用**NameNode的addBlock**方法,这个方法**返回的是各个block块分别需要写在哪3个DataNode**上面。 此时OutputStream就完整得知了数据和数据该往哪里去写了

③ 具体的写流程分析 请看流程4.1,**chunk**是一个512字节大小的数据块,写数据的过程中数据是一字节一字节往chunk那里写的,当写满一个chunk后,会计算一个checkSum,这个checkSum是4个字节大小,计算完成后一并放入chunk,所以整一个**chunk大小其实是512字节+4字节=516字节**。 上述步骤结束后,一个chunk就会往**package**里面放,package是一个**64kb大小的数据包**,我们知道 64kb = 64 * 1024字节,所以这个package可以放非常多的chunk。 此时一个package满了之后,会把这个packjage放到一个data queue队列里面,之后会陆续有源源不断的package传输过来,图中用p1,p2···等表示 这时候开始真正的写数据过程

1. data queue中的package往数据节点DataNode上传输,传输的顺序按照NameNode的addBlock()方法返回的列表依次传输** (ps:传输的类为一个叫做dataStreamer的类,而且其实addBlock方法返回的列表基本是按照离客户端物理距离由近到远的顺序的)

2. 往DataNode上传输的同时也往确认队列ack queue上传输

3. 针对DataNode中传输完成的数据做一个checkSum,并与原本打包前的checkSum做一个比较

4. 校验成功,就从确认队列ack queue中删除该package,否则该package重新置入data queue重传

补充:

1.以上逻辑归属于FSData OutputStream的逻辑

2.虽然本身一个block为128M,而package为64Kb,128M对于网络传输过程来说算是比较大,拆分为小包是为了可靠传输

3.网络中断时的举措:HDFS会先把整个pineline关闭,然后获取一个已存在的完整的文件的version,发送给NameNode后,由NameNode通过心跳机制对未正确传输的数据下达删除命令

4.如果是某个DataNode不可用,在1中我们也提到过了,通过心跳机制会通知其余的可用DataNode的其中一个进行copy到一个可用节点上

④ 写入结束后的行动 完成后通过心跳机制NameNode就可以得知副本已经创建完成,再调用addBlock()方法写之后的文件。

⑤ 流程总结 **1.client端调用Distributed FileSystem的create,此时是远程调用了NameNode的create,此时NameNode进行4个操作,什么叫软文营销?检测自己是否正常,文件是否存在,客户端的权限和写日志

2.create的返回值为一个FSData OutputStream对象,此时client调用流的write方法,和NameNode进行连接,NameNode的addBlock方法返回块分配的DataNode列表

3.开始写数据,先写在chunk,后package,置入data queue,此时两个操作,什么叫软文营销?向DataNode传输,和放入ack queue,DataNode传输结束会检测checkSum,成功就删除ack queue的package,否则放回data queue重传

4.结束后关闭流,告诉NameNode,调用complete方法结束** #### 简单代码示例(可跳过) String source="/home/node1/hdfs01.mp4"; //linux中的文件路徑,demo存在一定数据 //先确保/data目录存在 String destination="hdfs://node-01:9000/data/hdfs01.mp4";//HDFS的路徑 InputStream in = null; try { in = new BufferedInputStream(new FileInputStream(source)); //HDFS读写的配置文件 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(destination),conf); //调用Filesystem的create方法返回的是FSDataOutputStream对象 //该对象不允许在文件中定位,因为HDFS只允许一个已打开的文件顺序写入或追加 OutputStream out = fs.create(new Path(destination)); IOUtils.copyBytes(in, out, 4096, true); } catch (FileNotFoundException e) { System.out.println("exception"); e.printStackTrace(); } catch (IOException e) { System.out.println("exception1"); e.printStackTrace(); } ## 3.Hadoop HA高可用 ![](https://user-gold-cdn.xitu.io/2019/11/12/16e5c6be60ca70ca?w=991&h=525&f=png&s=235139) 之前已经提到过,元数据是放在NameNode的内存中的,当元数据丢失,造成服务不可用,这时候就需要时间来恢复。HA就可以让用户感知不到这种问题。

这需要yarn,MapReduce,zookeeper的支持 仅有一个NameNode时,当NameNode挂掉就需要把fsimage读到内存,然后把edits log所记录的日志重新执行一遍,元数据才能恢复,而这种做法需要大量的时间 所以解决方案就在于我们要花大量时间来恢复元数据metaData,所以解决的方案就是让集群瞬间变回可用状态即可。

通过设置一个stand by的NameNode,并和主NameNode的元数据保持一致,图中绿色的区域表示一个共享存储,主NameNode的元数据会传输至共享存储里面,让stand by的NameNode进行同步。 下面的DataNode会同时往两个NameNode发送blockReport,因为读取DataNode的块信息并不会很快,所以为了保证在active挂掉的时候,standby能立刻顶上位置,所以要事先读取块信息,同时这也是方便standby来构建它的元数据信息的途径。

active挂掉后让stand by立刻生效的机制是上面的FailoverControllerActive实现的,简称zkfc,它会定时ping主NameNode,如果发现NameNode挂掉,就会通知我们的zookeeper集群,然后集群的另一个FailoverControllerActive就会通知stand by。 ## 4.Hadoop联邦 集群中的元数据会保存在NameNode的内存中,而这些元数据每份占用约150字节,对于一个拥有大量文件的集群来说,因为NameNode的metaData被占满,DataNode就无法写入了,联邦就可以帮助系统突破文件数上限 其实就是布置了多个NameNode来共同维护集群,来增加namespace,而且分散了NameNode的访问压力,而且客户端的读写互不影响。就是**扩展性,高吞吐和隔离性**。

![](https://user-gold-cdn.xitu.io/2019/11/12/16e5c7b7db12ab04?w=868&h=521&f=png&s=186119) ## 5.HDFS存储大量小文件 和刚刚的联邦的介绍时的情况一样,文件数量(每个文件元数据150byte)会影响到NameNode的内存 ### 方案1:HAR文件方案 其实就是通过一个MR程序把许多小文件合并成一个大文件,需要启动Yarn # 创建archive文件 hadoop archive -archiveName test.har -p /testhar -r 3 th1 th2 /outhar # 原文件还存在,需手动删除 # 查看archive文件 hdfs dfs -ls -R har:///outhar/test.har # 解压archive文件 hdfs dfs -cp har:///outhar/test.har/th1 hdfs:/unarchivef hadoop fs -ls /unarchivef # 顺序 hadoop distcp har:///outhar/test.har/th1 hdfs:/unarchivef2 # 并行,启动MR ### 方案2:Sequence File方案 其核心是以文件名为key,文件内容为value组织小文件。

10000个100KB的小文件,可以编写程序将这些文件放到一个SequenceFile文件,然后就以数据流的方式处理这些文件,也可以使用MapReduce进行处理。一个SequenceFile是可分割的,所以MapReduce可将文件切分成块,每一块独立操作。不像HAR,SequenceFile支持压缩。在大多数情况下,以block为单位进行压缩是好的选择,因为一个block包含多条记录,压缩作用在block之上,比reduce压缩方式(一条一条记录进行压缩)的压缩比高.把已有的数据转存为SequenceFile比较慢。比起先写小文件,再将小文件写入SequenceFile,一个更好的选择是直接将数据写入一个SequenceFile文件,省去小文件作为中间媒介. 此方案的代码不是很重要,所以就直接省略了,实在是想看看长啥样的可以艾特我 ## finally 关于HDFS比较细节的东西在这篇有补充 到此HDFS的内容就差不多了,希望对你会有所帮助。之后会继续往下分享MapReduce,带你走完整个大数据的流程,感兴趣的朋友可以持续关注下,谢谢。


本文名称:带你入坑大数据(二)---HDFS的读写流程和一些重要策略
文章分享:http://scyanting.com/article/sdppec.html