hadoop源码解读

一、hadoop rpc总结

1、RPC指的是不同进程的方法调用,分为客户端和服务端,客户端调用服务端的方法,方法的执行在服务端。

2、如何实现Hadoop RPC的调用,必须要实现协议,这个协议其实就是一个接口,但是这个接口必须要有一个重要的特征,里面必须有VersionID.

3、RPC的服务端必须实现这些协议(接口)。

4、如何构建RPV的调用。

服务端:(构建者模式)

Server server = new RPC.Builder(new Configuration()).setBindAddress("localhost").setPort(9999).setProtocol(ClientProtocol.class).setInstance(new NameNodeRpcServer()).build();客户端:获取代理以及各种参数ClientProtocol namenode = RPC.getProxy(ClientProtocol.class, 1234L,new InetSocketAddress("localhost",9999),new Configuration());

Hadoop源码中有两种RPC,一种Hadoop RPC,另一种是HttpServer RPC,有什么区别?

应对的数据量不同,如果传输的数据量比较大,比如读写日志,用httpserver rpc,数据量比较小时,就是RPC之间的调用,用Hadoop RPC.

二、源码流程解读 启动

NameNode启动流程:

在createNameNode方法中通过不同的场景switch …… case进入(format,rollBack,checkPoint,recover)默认进入实例化 NameNode(new NameNode)

默认进入实例化NameNode(new NameNode)-> initialize(conf)初始化方法。

  1. startHttpServer方法 -> 设置主机名和端口号(50070),绑定多个servlet(功能)

  2. 加载元数据

loadFromDisk(conf) -> loadFsImage(startOpt)

1)合并元数据,将fsimage和editlog合并

2)把合并出来新的fsimage写到磁盘,老的删掉

3)打开一个新的editlog,开始写日志。

3. 创建RPC服务端

createRpcServer(conf) -> NameNodeRpcServer -> 启动ServiceRpcServer

4. 启动公共服务,NameNode RPC的服务就在里面启动的

1)进行资源检查,检查存储元数据的磁盘空间是否足够

a. 如果磁盘空间不足;会在日志里打印告警,且hasResourceAvailable = false

2)进入安全模式检查,检查是否可以退出安全模式

HDFS进入安全模式的三个条件(或关系):

条件一:计算阈值,block 块数 * 0.999,判断目录元数据是否大于阈值

threshold != 0 && blockSafe < blockThreshold

HDFS的元数据那儿程序总计分析出来上一次关闭集群之前

假设有1000个complete的block,默认是阈值的计算比例是0.999

这样blockThreshold的值是999

现在集群起来了以后,发现累计datanode汇报过来的complete的block个数(blockSafe)

如果小于999就让集群处于安全模式。

条件二:判断存活dataNode个数是否大于配置数目

datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold

如果存活的datanode的个数小于一定的数目的时候,也会进去安全模式

默认是0,所以相当于没启用,但是我们也可以配置,如果存活的datanode个数

少于多少就让HDFS集群出入安全模式。

条件三:检查NameNode写元数据目录是否大于100M

!nameNodeHasResourcesAvailable()

就是前面 检查NameNode写的元数据的目录空间是否大于100M,

如果目录的空间小于100M,nameNodeHasResourcesAvailable 就为false

hdfs就会进入安全模式。

DataNode 总结:

1)一个集群里面可以有很多个DataNode,这些DataNode就是用来存储数据(hdfs上block文件块)。

2)DataNode启动了以后会周期性的跟NameNode进行通信(心跳,块汇报),客户端也可以跟DataNode进行交互或者DataNode之间也可以进行相互通信。

3)NameNode不能直接操作DataNode.而是通信心跳返回值指令的方式操做DataNode.DataNode也会去响应NameNode,响应NameNode发送过来的一些指令,比如:删除block,复制block等操作。

4) DataNode启动了以后开放了一个socket的服务(RPC),等待别人去调用他。DataNode启动的时候会把自己的主机名和端口号汇报给NameNode.也就是说如果Client和DataNode想要去访问某个DataNode.首先要跟NameNode进行通信;从NameNode那儿获取到目标DataNode的主机名和端口号。这样才可以访问到对应的DataNode了。

DataNode启动流程:

secureMain -> createDataNode(初始化DataNode) -> instantiateDataNode(实例化DataNode) -> makeInstance -> new DataNode -> startDataNode(启动DataNode)

initDataXceiver (初始化DataXceiver,dataNode用来接收客户端和其它DataNode传来block数据的服务)

startInforServer (启动HttpServer服务,绑定了多个servlet)

initRpcServer (初始化RPC服务)

new BlockPoolManager 创建了BlockPoolManager对象

refreshNameNodes

1. 向NameNode进行注册

2. 跟NameNode进行心跳

doRefreshNameNode

1)如果是联邦架构,里面会有多个NameService

2)一个联邦就是一个NameService

a. 一个联邦对应一个BPOfferService

b. 一个联邦的一个NameNode就是一个BPServiceActor

c. 正常来说一个BPOfferService对应NameNode个数个BPServiceActor

3. startAll(DataNode向NameNode进行注册和心跳)

HDFS心跳流程:

心跳主要就是两个目的:

1. 更改存储信息

2. 更新上一次的心跳时间 

总结:在分布式场景下

注册:从节点向主节点进行注册本质上,就是把自己的主机名、端口号等信息写到主机的各种内存结构中。

心跳:对于分布式文件系统,就是把自己的存储信息告诉主节点,更新上一次的心跳时间

三、源码流程解读 写数据场景

HDFS元数据管理流程

 HDFS双缓冲机制

思考三个问题: 

1. 交换内存的条件是什么?

2. 将磁盘写改为内存写,会不会存在丢数据的风险?

3. 当数据从SyncBufffer内存往磁盘写数据还没写完的同时,client请求由于高并发的原因往CurrentBuffer内存中写数据写满,NameNode会有什么表现形式?

元数据创建流程:

  1. 创建元数据目录树

  2. 通过双缓冲机制将元数据写到本地和Journalnode(通过自己实现的NIO)

  3. standBy NameNode从JournalNode读取元数据(跨服务跨进程读取,后台的线程),把获取到的元数据作用到自己的元数据里面。

通过创建了一个HttpURLConnection对象,发送一个Http请求(相当于一个RPC),读取数据流。通过流对烤方式将元数据写到standBy NameNode目录树上。

  1. 定期checkPoint,将内存中的目录树合并元数据并持久化到磁盘上,替换fsImage,将已经合并完的日志删除。

checkPoint两个条件(或):

1. 时间 (距上次checkPoint时间,默认一个小时)

2. 数量(比如多少条日志,默认100万条)

checkPoint步骤:

1. 把元数据持久化到磁盘

2. 开启一个异步线程,把刚从内存里面的数据持久化到磁盘,上传数据到active namenode上面

HDFS上传文件源码流程 :

  1. create抽象方法,DistributedFileSystem实现类中的create方法实现(客户端)

a) 创建了一个DFSOutputStream,做很多初始化操作

1)往文件目录树里面添加INodeFile

2)添加了[契约(Lease)]管理

1⃣️ 先查看这个契约是否已经存在

a. 如果没有(第一次进来)肯定创建一个契约

存储到数据结构中(可以排序<实现compare进行升序排序>,底层是红黑树数据结构)

2⃣️ 如果有(第二次进来)那就是续约

3)启动了DataStreamer(写数据流程关键服务)/重要

第一次进入时,dataQuene没有数据,会启用线程阻塞

b) 开启续约(契约)

调用线程run方法,进行周期性续约

超过30秒没有进行续约就进行续约(当前时间-上一次续约时间)

续约和心跳类似,获取namenode的代理进行续约,续约完修改上一次续约时间,如果有契约,先从数据结构中删除契约,修改上一次的契约心跳时间,再把修改完以后的契约加入到数据结构中。同样会有个类似于心跳的监控线程,去检查契约是否过期;从最老的契约开始检查。

2. write方法

HDFS文件 -> Block块(128M)-> packet(64K)= chunk(127个chunk构成一个packet) -> chunk(521 bit) + chunksum(校验值 4 Bit)  = 516 Bit

1. 计算出chunk的校验和

2. 按照chunk的大小遍历数据

一个一个的chunk去写数据

创建packet

往packet里面写chunk的校验和(4 Bit)

往packet里面写chunk(512 Bit)

写满127个chunk就是一个完整的packet

写满128M就是一个block

写满一个packet,就把这个packet写入队列(如果队列写满就等待)

唤醒之前睡眠的队列(因为此时已经有数据了)

3. 从dataQuene队列里面获取到数据(packet)

4. 建立数据通道

A. 向namenode申请block

因为申请block或者建立数据管道,这些都是重要的操作,肯定要执行成功,但是这些操作都涉及到网络请求,网络这个事,就不好说了,可能会有网络抖动什么的,所以代码中执行一次,不是说失败就失败了,肯定要多次尝试,所以HDFS源码里面很多地方就会用到循环。

服务端那边的操作:

1⃣️ 创建一个block,往文件目录树中挂载了block的信息

2⃣️ 在磁盘上记录了元数据信息

3⃣️ 往blockManager里面记录了block的元数据信息

B. 建立数据通道

1建立数据管道的目的就是提前将就收数据的线程或者socket服务启动起来,启动起来以后就构建好数据管道。

2 HDFS中就是客户端往hadoop1中写,在从hadoop1往hadoop2写。。。

这样设计的目的:

1. 减少客户端网络带宽连接压力

2. 客户端和hadoop1服务夸机房或者夸地域,这样传输的性能会差

3然后发送写数据请求,通过之前初始化好的DataXceiver来写数据

1.接收socket请求,每发送过来一个block,都启动一个DataVceiver去处理这个block,就是启动一个线程去处理。先去读取此次请求的类型(option)

2.根据请求类型进行处理,(写block)

3.通过writeBlock实现,里面创建BlockReceiver,并且查看是否有下游的服务器,有的话就创建镜像(副本),接着往下游发送socket连接

4⃣️ 建立管道时,有可能遇到管道建立不上,某个服务器连接不上

如果管道建立不成功,客户端调用服务器(namenode)代码,去放弃这个block,并且重新去申请Block,同时记录记录出问题那台服务器的编号。(记录原因:需要重新去申请block,namenode根据负载均衡和机架感知去重新申请,就得记录下来失败的那台机器,再一次重试的时候,就排除有故障的服务器)

5. 启动了ResponseProcessor,用来监听一个packet是否发送成功

DataStream会将数据(packet)发送到datanode上面,datanode到底有没有写成功,需要返回一个成功的响应(ACK),最终向客户端汇报处理的结果。

这个过程中会有一个AckQueue配合使用,会将这个packet先放到AckQueue中(把当前接收到的packet放大ackQueue,唤醒wait的线程,同时将dataQueue中的packet移除),再把当前的这个packet发送给下游的节点(数据管道里面),然后校验数据,没问题,就将数据写到本地磁盘上面;写成功的话就返回写成功,写失败的话,先重试,不行就会将AckQueue中的这个packet重新返回给dataQueue,dataQueue有这个数据后,就会将这个数据重新写一遍。(写到各个磁盘上面应该是同步的)

如果写成功,就会将这个packet从AckQueue中移除。

容错,写的过程中,很可能会遇到问题,通过try…catch捕获异常,捕获到异常,就会将hasError标识改为true,本身就是分布式的代码,循环执行的,他会再次进入代码,但是会有判断,进入时就会进,关闭流和线程的代码,并且进入processDatanodeError方法去处理,首先关闭流,重新把ackQueue的数据加入到dataQueue中,并将ackQueue中的packet清空,重新建立数据管道,这次建立管道会将有问题的服务器排除,直接传输正常的服务器节点。那么这样一来,副本数就会少一个,不用担心,等到datanode和namenode心跳的时候,会进行容错,将正常节点上的副本复制到之前有问题的节点上。还有一种情况,集群中超过一半的节点有问题,问题就比较大了,这时候就需要推倒重来,重新申请block,重新建立管道。

 

先引入一个小的背景,假如多个客户端同时要并发的写Hadoop HDFS上的一个文件,这个事儿能成吗? 明显不可以接受啊,因为HDFS上的文件是不允许并发写的,比如并发的追加一些数据什么。所以HDFS里有一个机制,叫做文件契约机制。也就是说,同一时间只能有一个客户端获取NameNode上面一个文件的契约,然后才可以向获取契约的文件写入数据。

此时如果其他客户端尝试获取文件契约的时候,就获取不到,只能干等着。通过这个机制,可以保证同一时间只有一个客户端在写一个文件。在获取到了文件契约之后,在写文件的过程期间,那个客户端需要开启一个线程,不停的发送请求给 NameNode进行文件续约,告诉NameNode: NameNode大哥,我还在写文件啊,你给我一直保留那个契约好吗? 而NameNode内部有一个专门的后台线程,负责监控各个契约的续约时间。如果某个契约很长时间没续约了,此时就自动过期掉这个契约,让别的客户端来写。

1. 创建文件

2. 创建契约

3. 启动了DataStramer线程 

4. 开启了续约

5. 契约的检查

6. 创建packet

7. 申请Block

8. 建立数据管道

9. ResponseProcessor线程

10. PacketResponder线程

四、RPC示例:

/**
*在pom.xml引入依赖
*/
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.0</version></dependency>
</dependencies>/*** 网络协议*/
public interface Protocol {//定义版本号,可自定义long versionID=123456789L;void hello(String msg);void add(int num);
}/**
*定义服务端实现类
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;import java.io.IOException;public class NameNodeRPCServer implements Protocol {public void hello(String msg) {System.out.println(" hello " + msg);}public void add(int num) {}public static void main(String[] args) throws IOException {Server server = new RPC.Builder(new Configuration()).setBindAddress("localhost").setPort(9999).setProtocol(Protocol.class).setInstance(new NameNodeRPCServer()).build();//启动服务端System.out.println("我是RPC服务端,我准备启动了");server.start();System.out.println("启动完成");}
}/**
*定义客户端类
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;import java.io.IOException;
import java.net.InetSocketAddress;public class Client {public static void main(String[] args) throws IOException {Protocol namenode = RPC.getProxy(Protocol.class,Protocol.versionID,new InetSocketAddress("localhost", 9999),new Configuration());namenode.hello("hadoop architechure");}}

启动NameNodeRPCServer,到服务器控制台执行jps,你会发现多了一个NameNodeRPCServer进程,所以不管是NameNode还是DataNode,其实都是一个RPC进程,于是我们可以从NameNode和DataNode这两个类入手.

 NameNode服务既管理了HDFS的集群的命名空间和 "inode table"。
一个HDFS集群里面只有一个NameNode.(除了HA方案,或者是联邦)

 Namenode管理了两张极其重要的表:
1)一张表管理了文件与block之间的映射关系。
2)另一张表管理了block文件块与 DataNode主机之间的映射关系。

第一张表存储到了磁盘上面。(因为文件与block块之间的关系是不会发生变化的)
每次NameNode重启的时候重新构建第二张映射表。

 Namenode服务是由三个重要的类支撑的:
 1)NameNode类:
 管理配置的参数
2)NameNode server:
 IPC Server:
NameNodeRPCServer:开放端口,等待别人调用.比如:8020/9000
 HTTP Server:
NameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况
 3) FSNameSystem:
    这个类非常重要,管理了HDFS的元数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/181819.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Android Jetpack】Room数据库

文章目录 引入EntitiesPrimary Key主键索引和唯一性对象之间的关系外键获取关联的Entity对象嵌套对象Data Access Objects&#xff08;DAOs&#xff09;使用Query注解的方法简单的查询带参数查询返回列的子集可被观察的查询 数据库迁移用法 引入 原始的SQLite有以下两个缺点: …

LCD屏接口与模式详解:干货超多

前言 随着时代的发展&#xff0c;现如今我们生活上已经随处可见的各种电子产品了&#xff0c;诸如手机、平板、电脑、一些其它智能单品上都有用到显示屏&#xff0c;它作为人机交互的重要桥梁之一&#xff0c;我认为它是生活中必不可少的存在&#xff0c;如果少了它&#xff0c…

【Electron】上下键切换消息

需求&#xff1a; 如图&#xff0c;需要监听上下键切换消息 Electron 注册 全局快捷键【globalShortcut】监听 在focus注册 在blur 注销 如苹果系统在使用某个软件(focus)时 右上角会有应用标题 Electron 代码&#xff1a; win.on(focus, ()>{globalShortcut.register(U…

物联网边缘计算是什么?如何实现物联网边缘计算?

物联网边缘计算是一种在物联网设备和网络中实施计算和数据处理的技术。它允许在物联网设备或网络边缘进行数据分析和处理&#xff0c;而不需要将所有数据传输到远程数据中心或云端进行处理。物联网边缘计算将计算和数据处理的能力迁移到物联网设备的边缘&#xff0c;使得设备能…

如何群发发票邮件内容

群发发票邮件内容需要仔细考虑邮件的主题、正文和附件内容&#xff0c;以确保邮件的准确性和完整性。以 一、明确邮件目的 在群发发票邮件时&#xff0c;首先需要明确邮件的目的。一般来说&#xff0c;发票邮件的目的是向客户或供应商提供交易的详细记录和证明。因此&#xf…

啊哒-MISC-bugku-解题步骤

——CTF解题专栏—— 题目信息&#xff1a; 题目&#xff1a;啊哒 作者&#xff1a;第七届山东省大学生网络安全技能大赛 提示&#xff1a;无 解题附件&#xff1a; 解题思路&#xff1a; 图片的话还是老三样斧winwalk、010Editor、Stegsolve。ok直接开搞&#xff01; 解题…

基于UDP的TFTP文件传输

代码&#xff1a; #include <myhead.h>//实现下载功能 int download(int cfd,struct sockaddr_in sin) {char buf[516] ""; //定义资源包char fileName[128] ""; //定义文件名printf("请输入文件名:");scanf("%s",fileName…

Re0: 从零实现一个置顶任意窗口的小工具

前言 话不多说&#xff0c;先上效果&#xff1a; 这里展示的是通过下拉框选择窗口&#xff0c;让窗口显示并置顶&#xff0c;其实还可以直接通过快捷键&#xff08;先鼠标点击要置顶的窗口&#xff0c;再使用CTRLSHIFTT&#xff09;&#xff0c;本文涉及到的完整代码已上传到G…

【JavaEE初阶】 HTTP 请求 (Request)详解

文章目录 &#x1f340;序言&#x1f384;认识URL&#x1f6a9;URL 基本格式&#x1f6a9;query string&#x1f6a9;关于 URL encode &#x1f334;认识 "方法" (method)&#x1f6a9;GET方法&#x1f6a9;POST 方法&#x1f6a9; GET 和 POST 的区别 &#x1f38b;…

Java后端开发——JDBC(万字详解)

Java后端开发——JDBC&#xff08;万字详解&#xff09; 今日目标 掌握JDBC的的CRUD理解JDBC中各个对象的作用掌握Druid的使用 1&#xff0c;JDBC概述 在开发中我们使用的是java语言&#xff0c;那么势必要通过java语言操作数据库中的数据。这就是接下来要学习的JDBC。 1.1 …

【Axure高保真原型】区间评分条

今天和大家分享区间评分条的原型模板&#xff0c;我们可以左右拖动移动滑块到指定位置&#xff0c;评分条上方会根据两个滑块所在的位置&#xff0c;自动计算出对应的区间范围&#xff1b;这个原型模板使用也很简单&#xff0c;只需要在里面填写区间的最大值&#xff0c;即可自…

纯前端实现导入excel数据

准备工作 - 下载 xlsx npm install xlsx下面直接上代码&#x1f447; <template><div><input type"file" accept".xlsx, .xls" change"handleClick"></div> </template><script langts setup> import * a…

24.有哪些生命周期回调方法?有哪几种实现方式?

有哪些生命周期回调方法?有哪几种实现方式? 有两个重要的bean 生命周期方法, 第一个是init , 它是在容器加载bean的时候被调用。第二个方法是 destroy 它是在容器卸载类的时候被调用。bean 标签有两个重要的属性(init-method和destroy-method)。用它们你可以自己定制初始…

盛最多水的容器-中等

leetcode链接 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不能倾…

JS之Object.defineProperty方法

给对象添加属性的方法有许多&#xff0c;这次让我为大家介绍一种给对象添加属性的静态方法吧&#xff01; 语法&#xff1a;Objcet.defineProperty(对象的名称&#xff0c;“添加的键名”&#xff0c;{value&#xff1a;键值}) const obj {name:"张三",age:18}// 我…

Typora .MD笔记中本地图片批量上传到csdn (.PNG格式)(无需其他任何图床软件)

Typora .MD笔记中本地图片批量上传到csdn &#xff08;.PNG格式&#xff09;(无需其他任何图床软件) 截图软件推荐 qq 截图 快捷键 ctrlshiftA. 步骤一 设置Typora 的图片 点击文件. 点击偏好设置 ->图像 我们可以选择将图片复制到我们的文件夹中。 建议刚写好文件标题就…

C#键盘钩子(Hook)拦截器的使用

引言 键盘钩子(Hook)是一种机制&#xff0c;允许程序捕获和处理操作系统中的键盘输入。在C#中&#xff0c;我们可以使用键盘钩子来创建一个拦截器&#xff0c;用于拦截特定的键盘事件并执行自定义操作。本文将介绍如何使用C#开发一个键盘钩子拦截器&#xff0c;并给出一些示例代…

算法中的时间复杂度,空间复杂度

一、前言 算法&#xff08;Algorithm&#xff09;是指用来操作数据、解决程序问题的一组方法。对于同一个问题&#xff0c;使用不同的算法&#xff0c;也许最终得到的结果是一样的&#xff0c;但在过程中消耗的资源和时间却会有很大的区别 衡量不同算法之间的优劣主要是通过时…

离线环境下使用百度地图(展示自己的地图瓦片)3.0版本api

案例: 设置覆盖物标注提示文字: <script>// 百度地图API功能var map new BMap.Map("map",{ mapType: BMAP_HYBRID_MAP }); var point new BMap.Point(120.55294, 41.665515); // 创建Map实例map.centerAndZoom(point, 18); // 初始化地图,设置中心点坐标…

Java-多线程基本知识学习总结

多线程 前言一、线程的创建1、继承Thread类2、实现Runnable接口 二、线程的生命周期三、操作线程的方法1、线程的休眠2、线程的加入3、线程的礼让4、线程的优先级 四、线程同步End 前言 Java是支持多线程的编程语言&#xff0c;所谓多线程就是程序能够同时完成多种操作。 计算…