您好,欢迎访问三七文档
当前位置:首页 > 商业/管理/HR > 信息化管理 > hadooprpc客户端初始化和调用过程详解
本文主要记录hadooprpc的客户端部分的初始化和调用的过程,下面的介绍中主要通过DFSClient来说明,为什么用DFSClient呢?DFSClient作为namenode的客户端,通过rpc来操作hdfs。限于篇幅,本文对下文引用到的类,做了较大的剪裁,只给出了关键的部分,如有疑问,可以一起交流。DFSClient的初始化DFSClient的初始化主要看其构造函数,其中rpc部分我们主要关注属性finalClientProtocolnamenode,DFSClient的文件系统操作都是由他代理完成,构造函数中的关键代码如下:?1234567publicDFSClient(URInameNodeUri,ClientProtocolrpcNamenode,Configurationconf,FileSystem.Statisticsstats)throwsIOException{proxyInfo=NameNodeProxies.createProxy(conf,nameNodeUri,ClientProtocol.class);this.dtService=proxyInfo.getDelegationTokenService();this.namenode=proxyInfo.getProxy();}显然,DFSClient中的namenode是一个代理类。接着NameNodeProxies类的createProxy方法,下面给出了NameNodeProxies中需要用到的一些方法:?1234567publicclassNameNodeProxies{publicstaticTProxyAndInfoTcreateProxy(Configurationconf,URInameNodeUri,ClassTxface)throwsIOException{returncreateNonHAProxy(conf,NameNode.getAddress(nameNodeUri),xface,UserGroupInformation.getCurrentUser(),true);}891011121314151617181920212223242526272829303132333435publicstaticTProxyAndInfoTcreateNonHAProxy(Configurationconf,InetSocketAddressnnAddr,ClassTxface,UserGroupInformationugi,booleanwithRetries)throwsIOException{proxy=(T)createNNProxyWithClientProtocol(nnAddr,conf,ugi,withRetries);returnnewProxyAndInfoT(proxy,dtService);}/**这部分是重点*/privatestaticClientProtocolcreateNNProxyWithClientProtocol(InetSocketAddressaddress,Configurationconf,UserGroupInformationugi,booleanwithRetries)throwsIOException{ClientNamenodeProtocolPBproxy=RPC.getProtocolProxy(ClientNamenodeProtocolPB.class,version,address,ugi,conf,NetUtils.getDefaultSocketFactory(conf),org.apache.hadoop.ipc.Client.getTimeout(conf),defaultPolicy).getProxy();proxy=(ClientNamenodeProtocolPB)RetryProxy.create(ClientNamenodeProtocolPB.class,newDefaultFailoverProxyProviderClientNamenodeProtocolPB(ClientNamenodeProtocolPB.class,proxy),methodNameToPolicyMap,defaultPolicy);returnnewClientNamenodeProtocolTranslatorPB(proxy);}}该类中前面两个方法做跳转用,直接看createNNProxyWithClientProtocol方法,这里两行很关键的代码,proxy实例的初始化,这里先提示注意前一行中的getProxy()对于这个方法是需要注意的,这样也保证了类型的一致。这时候就不得不调出RPC这个类来看看他是怎么生成proxy的实例的了,看代码:ProtobufRpcEngineProtobufRpcEngineProtobufRpcEngineProtobufRpcEngine?1publicclassRPC{2345678910111213141516publicstaticTProtocolProxyTgetProtocolProxy(ClassTprotocol,longclientVersion,InetSocketAddressaddr,UserGroupInformationticket,Configurationconf,SocketFactoryfactory,intrpcTimeout,RetryPolicyconnectionRetryPolicy)throwsIOException{if(UserGroupInformation.isSecurityEnabled()){SaslRpcServer.init(conf);}returngetProtocolEngine(protocol,conf).getProxy(protocol,clientVersion,addr,ticket,conf,factory,rpcTimeout,connectionRetryPolicy);}}RPC中还是需要进一步的跳转,但是这里需要注意,getProtocolEngine这个方法,这里做一个说明,查看RpcEngine的依赖,看图:在我的2.4.1的hadoop的版本中,hadoop的序列化框架已经用了Protobuf,所以getProtocolEngine方法得到的是ProtobufRpcEngine类的一个实例,那好,我们进一步跟踪ProtobufRpcEngine类的getProxy方法,看代码:?12publicclassProtobufRpcEngineimplementsRpcEngine{publicTProtocolProxyTgetProxy(ClassTprotocol,longclientVersion,34567891011InetSocketAddressaddr,UserGroupInformationticket,Configurationconf,SocketFactoryfactory,intrpcTimeout,RetryPolicyconnectionRetryPolicy)throwsIOException{finalInvokerinvoker=newInvoker(protocol,addr,ticket,conf,factory,rpcTimeout,connectionRetryPolicy);returnnewProtocolProxyT(protocol,(T)Proxy.newProxyInstance(protocol.getClassLoader(),newClass[]{protocol},invoker),false);}}对java的动态代理有点了解的人看到Proxy.newProxyInstance这个方法应该都很清楚这就是生成一个远程代理类实例(特别注意:在NameNodeProxies类的createNNProxyWithClientProtocol方法中getProxy方法拿到的对象也就是这个对象),其中的invoker参数,确实我们不能忽略的,因为他暗藏玄机,java的动态代理中,invoker的类需要实现InvocationHandler接口,该接口只听过一个方法invoke,共代理类使用,及通过Proxy.newProxyInstance生成的代理类,在使用的时候是通过InvocationHandler的invoke方法来起作用的。好吧,现在我们可以顺便看看在ProtobufRpcEngine类的getProxy方法中invoker局部变量的类依赖图:,显然有刚才提到的实现关系,现在再让我们看看Invoker的内部,包括构造函数和invoke方法:?1234567891011121314privateInvoker(Class?protocol,Client.ConnectionIdconnId,Configurationconf,SocketFactoryfactory){this.remoteId=connId;this.client=CLIENTS.getClient(conf,factory,RpcResponseWrapper.class);this.protocolName=RPC.getProtocolName(protocol);this.clientProtocolVersion=RPC.getProtocolVersion(protocol);}publicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsServiceException{val=(RpcResponseWrapper)client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,newRpcRequestWrapper(rpcRequestHeader,theRequest),remoteId);}在构造函数请注意一个属性client,他的类型正式org.apache.hadoop.ipc.Client,而且在invoke方法中发起远程调用的正是这个client属性,能够读到这里的同学,相信应该比较清楚了,在DFSClient中发起远程访问的就是这个Client类的实例。关于DFSClient的初始化阶段中关于rpc的部分,总结一句,就是创建一个namenode的代理对象,供后续的文件系统操作调用。DFSClient的getFileLinkInfo方法DFSClient提供了相当丰富的API供客户端操作hadoop的文件系统,这里以getFileLinkInfo为例,讲解rpc客户端的调用过程。注意:如果是FileSystem类的话,请使用方法getFileLinkStatus,他对DFSClient提供的getFileLinkInfo做了一层包装,仅此而已。直接看DFSClient中的代码:?1publicHdfsFileStatusgetFileLinkInfo(Stringsrc)throwsIOException{23456789checkOpen();try{returnnamenode.getFileLinkInfo(src);}catch(RemoteExceptionre){throwre.unwrapRemoteException(AccessControlException.class,UnresolvedPathException.class);}}很简答的一行代码,通过namenode属性的调用操作完成,看了DFSClient的初始化过程,我们很容易知道namenode的实例化类是Clie
本文标题:hadooprpc客户端初始化和调用过程详解
链接地址:https://www.777doc.com/doc-2875680 .html