您好,欢迎访问三七文档
当前位置:首页 > 商业/管理/HR > 信息化管理 > Hadoop学习总结之二:HDFS读写过程解析
一、文件的打开1.1、客户端HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Pathf,intbufferSize),其实现为:publicFSDataInputStreamopen(Pathf,intbufferSize)throwsIOException{returnnewDFSClient.DFSDataInputStream(dfs.open(getPathName(f),bufferSize,verifyChecksum,statistics));}其中dfs为DistributedFileSystem的成员变量DFSClient,其open函数被调用,其中创建一个DFSInputStream(src,buffersize,verifyChecksum)并返回。在DFSInputStream的构造函数中,openInfo函数被调用,其主要从namenode中得到要打开的文件所对应的blocks的信息,实现如下:synchronizedvoidopenInfo()throwsIOException{LocatedBlocksnewInfo=callGetBlockLocations(namenode,src,0,prefetchSize);this.locatedBlocks=newInfo;this.currentNode=null;}privatestaticLocatedBlockscallGetBlockLocations(ClientProtocolnamenode,Stringsrc,longstart,longlength)throwsIOException{returnnamenode.getBlockLocations(src,start,length);}LocatedBlocks主要包含一个链表的ListLocatedBlockblocks,其中每个LocatedBlock包含如下信息:Blockb:此block的信息longoffset:此block在文件中的偏移量DatanodeInfo[]locs:此block位于哪些DataNode上上面namenode.getBlockLocations是一个RPC调用,最终调用NameNode类的getBlockLocations函数。1.2、NameNodeNameNode.getBlockLocations实现如下:publicLocatedBlocksgetBlockLocations(Stringsrc,longoffset,longlength)throwsIOException{returnnamesystem.getBlockLocations(getClientMachine(),src,offset,length);}namesystem是NameNode一个成员变量,其类型为FSNamesystem,保存的是NameNode的namespace树,其中一个重要的成员变量为FSDirectorydir。FSDirectory和Lucene中的FSDirectory没有任何关系,其主要包括FSImagefsImage,用于读写硬盘上的fsimage文件,FSImage类有成员变量FSEditLogeditLog,用于读写硬盘上的edit文件,这两个文件的关系在上一篇文章中已经解释过。FSDirectory还有一个重要的成员变量INodeDirectoryWithQuotarootDir,INodeDirectoryWithQuota的父类为INodeDirectory,实现如下:publicclassINodeDirectoryextendsINode{……privateListINodechildren;……}由此可见INodeDirectory本身是一个INode,其中包含一个链表的INode,此链表中,如果仍为文件夹,则是类型INodeDirectory,如果是文件,则是类型INodeFile,INodeFile中有成员变量BlockInfoblocks[],是此文件包含的block的信息。显然这是一棵树形的结构。FSNamesystem.getBlockLocations函数如下:publicLocatedBlocksgetBlockLocations(Stringsrc,longoffset,longlength,booleandoAccessTime)throwsIOException{finalLocatedBlocksret=getBlockLocationsInternal(src,dir.getFileINode(src),offset,length,Integer.MAX_VALUE,doAccessTime);returnret;}dir.getFileINode(src)通过路径名从文件系统树中找到INodeFile,其中保存的是要打开的文件的INode的信息。getBlockLocationsInternal的实现如下:privatesynchronizedLocatedBlocksgetBlockLocationsInternal(Stringsrc,INodeFileinode,longoffset,longlength,intnrBlocksToReturn,booleandoAccessTime)throwsIOException{//得到此文件的block信息Block[]blocks=inode.getBlocks();ListLocatedBlockresults=newArrayListLocatedBlock(blocks.length);//计算从offset开始,长度为length所涉及的blocksintcurBlk=0;longcurPos=0,blkSize=0;intnrBlocks=(blocks[0].getNumBytes()==0)?0:blocks.length;for(curBlk=0;curBlknrBlocks;curBlk++){blkSize=blocks[curBlk].getNumBytes();if(curPos+blkSizeoffset){//当offset在curPos和curPos+blkSize之间的时候,curBlk指向offset所在的blockbreak;}curPos+=blkSize;}longendOff=offset+length;//循环,依次遍历从curBlk开始的每个block,直到当前位置curPos越过endOffdo{intnumNodes=blocksMap.numNodes(blocks[curBlk]);intnumCorruptNodes=countNodes(blocks[curBlk]).corruptReplicas();intnumCorruptReplicas=corruptReplicas.numCorruptReplicas(blocks[curBlk]);booleanblockCorrupt=(numCorruptNodes==numNodes);intnumMachineSet=blockCorrupt?numNodes:(numNodes-numCorruptNodes);//依次找到此block所对应的datanode,将其中没有损坏的放入machineSet中DatanodeDescriptor[]machineSet=newDatanodeDescriptor[numMachineSet];if(numMachineSet0){numNodes=0;for(IteratorDatanodeDescriptorit=blocksMap.nodeIterator(blocks[curBlk]);it.hasNext();){DatanodeDescriptordn=it.next();booleanreplicaCorrupt=corruptReplicas.isReplicaCorrupt(blocks[curBlk],dn);if(blockCorrupt||(!blockCorrupt&&!replicaCorrupt))machineSet[numNodes++]=dn;}}//使用此machineSet和当前的block构造一个LocatedBlockresults.add(newLocatedBlock(blocks[curBlk],machineSet,curPos,blockCorrupt));curPos+=blocks[curBlk].getNumBytes();curBlk++;}while(curPosendOff&&curBlkblocks.length&&results.size()nrBlocksToReturn);//使用此LocatedBlock链表构造一个LocatedBlocks对象返回returninode.createLocatedBlocks(results);}1.3、客户端通过RPC调用,在NameNode得到的LocatedBlocks对象,作为成员变量构造DFSInputStream对象,最后包装为FSDataInputStream返回给用户。二、文件的读取2.1、客户端文件读取的时候,客户端利用文件打开的时候得到的FSDataInputStream.read(longposition,byte[]buffer,intoffset,intlength)函数进行文件读操作。FSDataInputStream会调用其封装的DFSInputStream的read(longposition,byte[]buffer,intoffset,intlength)函数,实现如下:publicintread(longposition,byte[]buffer,intoffset,intlength)throwsIOException{longfilelen=getFileLength();intrealLen=length;if((position+length)filelen){realLen=(int)(filelen-position);}//首先得到包含从offset到offset+length内容的block列表//比如对于64M一个block的文件系统来说,欲读取从100M开始,长度为128M的数据,则block列表包括第2,3,4块blockListLocatedBlockblockRange=getBlockRange(position,realLen);intremaining=realLen;//对每一个block,从中读取内容//对于上面的例子,对于第2块block,读取从36M开始,读取长度28M,对于第3块,读取整一块64M,对于第4块,读取从0开始,长度为36M,共128M数据for(LocatedBlockblk:blockRange){longtargetStart=position-blk.getStartOffset();longbytesToRead=Math.min(remaining,blk.getBlockSize()-targetStart);fetchBlockByteRange(blk,targetStart,targetStart+bytesToRead-1,buffer,offset);remaining-=bytesToRead;position+=bytesToRead;offset+=bytesToRead;}assertremaining==0:Wrongnumberofbytesread.;if(stats!=null){stats.incrementBytesRead(realLen)
本文标题:Hadoop学习总结之二:HDFS读写过程解析
链接地址:https://www.777doc.com/doc-4239139 .html