通过上面的Demo,可以看出只有RPC协议是自己实现的,server和client都是通过RPC提供的接口get到的。则具体上面的Demo来看下RPC相关的源码。 与RPC相关的类在common中的ipc中,ipc是inter-process communication 的缩写 RPC主要向外提供了一些编程接口,用于get server 和 client,是对底层客户机–服务器网络模型的封装。 client可以通过RPC提供的getProxy和waitForProxy两种方法得到,看下getProxy的具体实现:
/** Connect to the server and set up the I/O streams. It then sends * a header to the server and starts * the connection thread that waits for responses. */ privatesynchronizedvoidsetupIOstreams( AtomicBoolean fallbackToSimpleAuth){ if (socket != null || shouldCloseConnection.get()) { return; } try { if (LOG.isDebugEnabled()) { LOG.debug("Connecting to "+server); } if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connecting to " + server); } short numRetries = 0; Random rand = null; while (true) { // 建立socket进行连接 可见client是用socket进行通信 setupConnection(); InputStream inStream = NetUtils.getInputStream(socket); OutputStream outStream = NetUtils.getOutputStream(socket); // Write the connection header - this is sent when connection is established writeConnectionHeader(outStream); if (authProtocol == AuthProtocol.SASL) { ... } // 发送ping message, // The time after which a RPC will timeout. // If ping is not enabled (via ipc.client.ping), then the timeout value is the // same as the pingInterval. // If ping is enabled, then there is no timeout value. if (doPing) { inStream = new PingInputStream(inStream); } this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream if (!(outStream instanceof BufferedOutputStream)) { outStream = new BufferedOutputStream(outStream); } this.out = new DataOutputStream(outStream); // 向内容中写入消息,这里面将 connectionContextHeader的序列化协议写死了,不知道为什么?? // RpcRequestHeaderProto connectionContextHeader = ProtoUtil // .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, // OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, // RpcConstants.INVALID_RETRY_COUNT, clientId); // 这个方法与 writeConnectionHeader()的区别是什么 ,暂时还没有搞清。。。。。。。。 writeConnectionContext(remoteId, authMethod);
// update last activity time touch();
if (Trace.isTracing()) { Trace.addTimelineAnnotation("IPC client connected to " + server); }
// start the receiver thread after the socket connection has been set // up start(); // 启动connection线程,等待接受server的response return; } } catch (Throwable t) { if (t instanceof IOException) { markClosed((IOException)t); } else { markClosed(new IOException("Couldn't set up IO streams", t)); } close(); } }
privatesynchronizedvoidsetupConnection()throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { // 创建一个网络socket this.socket = socketFactory.createSocket(); this.socket.setTcpNoDelay(tcpNoDelay); this.socket.setKeepAlive(true); /* * Bind the socket to the host specified in the principal name of the * client, to ensure Server matching address of the client connection * to host name in principal passed. */ UserGroupInformation ticket = remoteId.getTicket(); if (ticket != null && ticket.hasKerberosCredentials()) { KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class); if (krbInfo != null && krbInfo.clientPrincipal() != null) { String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName()); // If host name is a valid local address then bind socket to it InetAddress localAddr = NetUtils.getLocalInetAddress(host); if (localAddr != null) { this.socket.bind(new InetSocketAddress(localAddr, 0)); } } } // server 在 connection的构造方法中进行赋值, // this.server = remoteId.getAddress(); // 在connect中进行socket与ip的绑定连接 NetUtils.connect(this.socket, server, connectionTimeout); if (rpcTimeout > 0) { pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval } this.socket.setSoTimeout(pingInterval); return; } catch (ConnectTimeoutException toe) { ... } catch (IOException ie) { ... } } }
publicvoidrun(){ if (LOG.isDebugEnabled()) LOG.debug(getName() + ": starting, having connections " + connections.size());
try { while (waitForWork()) {//wait here for work - read or close connection receiveRpcResponse(); } } catch (Throwable t) { // This truly is unexpected, since we catch IOException in receiveResponse // -- this is only to be really sure that we don't leave a client hanging // forever. LOG.warn("Unexpected error reading responses on connection " + this, t); markClosed(new IOException("Error reading responses", t)); } close(); if (LOG.isDebugEnabled()) LOG.debug(getName() + ": stopped, remaining connections " + connections.size()); }
if(call.error != null) { ... } else { // get server 的结果 return call.getRpcResponse(); } } } ... publicvoidsendRpcRequest(final Call call) throws InterruptedException, IOException { if (shouldCloseConnection.get()) { return; }
// Serialize the call to be sent. This is done from the actual // caller thread, rather than the sendParamsExecutor thread, // so that if the serialization throws an error, it is reported // properly. This also parallelizes the serialization. // // Format of a call on the wire: // 0) Length of rest below (1 + 2) // 1) RpcRequestHeader - is serialized Delimited hence contains length // 2) RpcRequest // // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, clientId); header.writeDelimitedTo(d); call.rpcRequest.write(d);
synchronized (sendRpcRequestLock) { Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() { @Override publicvoidrun(){ try { synchronized (Connection.this.out) { if (shouldCloseConnection.get()) { return; } if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); byte[] data = d.getData(); int totalLength = d.getLength(); out.writeInt(totalLength); // Total Length out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest out.flush(); } } catch (IOException e) { // exception at this point would leave the connection in an // unrecoverable state (eg half a call left on the wire). // So, close the connection, killing any outstanding calls markClosed(e); } finally { //the buffer is just an in-memory buffer, but it is still polite to // close early IOUtils.closeStream(d); } } }); try { // 等待call发送完毕之后才退出该方法,回到call方法中继续执行 senderFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // cause should only be a RuntimeException as the Runnable above // catches IOException if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } else { thrownew RuntimeException("unexpected checked exception", cause); } } } }
publicListener()throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); // 创建reader数组,默认长度为1 readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); }
// Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
publicsynchronizedvoidstart(){ responder.start(); listener.start(); //启动Hander线程,这个个数是初始化Server时通过setHander设置的 handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }
publicvoidrun(){ LOG.info(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); // ThreadLocal<Server> SERVER 线程局部变量 connectionManager.startIdleScan(); while (running) { SelectionKey key = null; try { // 得到selector getSelector().select(); Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) // SelectionKey.OP_ACCEPT 发生后由doAccept来处理 doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); connectionManager.closeIdle(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } } ... }
privatesynchronizedvoiddoRunLoop(){ while (running) { SelectionKey key = null; try { // consume as many connections as currently queued to avoid // unbridled acceptance of connections that starves the select int size = pendingConnections.size(); for (int i=size; i>0; i--) { // pendingConnections 是一个 BlockingQueue Connection conn = pendingConnections.take(); conn.channel.register(readSelector, SelectionKey.OP_READ, conn); } // 此方法会阻塞,调用readSelector.wakeup()跳出阻塞 readSelector.select();
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } }
voiddoRead(SelectionKey key)throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(Time.now()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { // a WrappedRpcServerException is an exception that has been sent // to the client, so the stacktrace is unnecessary; any other // exceptions are unexpected internal server errors and thus the // stacktrace should be logged LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + c.getHostAddress() + " threw exception [" + e + "]", (e instanceof WrappedRpcServerException) ? null : e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { closeConnection(c); c = null; } else { c.setLastContact(Time.now()); } }
publicintreadAndProcess() throws WrappedRpcServerException, IOException, InterruptedException { while (true) { /* Read at most one RPC. If the header is not read completely yet * then iterate until we read first RPC or until there is no data left. */ int count = -1; // this.dataLengthBuffer = ByteBuffer.allocate(4) if (dataLengthBuffer.remaining() > 0) { count = channelRead(channel, dataLengthBuffer); if (count < 0 || dataLengthBuffer.remaining() > 0) return count; } // 读取RPC header RPC header一共4部分 /** * Write the connection header - this is sent when connection is established * +----------------------------------+ * | "hrpc" 4 bytes | hrpc 在上面的if语句中读取 * +----------------------------------+ * | Version (1 byte) | * +----------------------------------+ * | Service Class (1 byte) | * +----------------------------------+ * | AuthProtocol (1 byte) | * +----------------------------------+ */ if (!connectionHeaderRead) { //Every connection is expected to send the header. if (connectionHeaderBuf == null) { connectionHeaderBuf = ByteBuffer.allocate(3); } // 将 Version、Service Class、AuthProtocol读出 count = channelRead(channel, connectionHeaderBuf); if (count < 0 || connectionHeaderBuf.remaining() > 0) { return count; } int version = connectionHeaderBuf.get(0); // TODO we should add handler for service class later this.setServiceClass(connectionHeaderBuf.get(1)); dataLengthBuffer.flip(); // Check if it looks like the user is hitting an IPC port // with an HTTP GET - this is a common error, so we can // send back a simple string indicating as much. if (HTTP_GET_BYTES.equals(dataLengthBuffer)) { setupHttpRequestOnIpcPortResponse(); return -1; } if (!RpcConstants.HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version + " expected version " + CURRENT_VERSION); setupBadVersionResponse(version); return -1; } // this may switch us into SIMPLE authProtocol = initializeAuthContext(connectionHeaderBuf.get(2)); dataLengthBuffer.clear(); connectionHeaderBuf = null; connectionHeaderRead = true; // 读完rpc head之后跳出此次循环 continue; } // 读取rpcrequest的内容 if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); checkDataLength(dataLength); data = ByteBuffer.allocate(dataLength); } count = channelRead(channel, data); if (data.remaining() == 0) { dataLengthBuffer.clear(); data.flip(); boolean isHeaderRead = connectionContextRead; // 先进行授权然后进行内容的读取 processOneRpc(data.array()); data = null; if (!isHeaderRead) { // 读取request内容时先进行授权,授权成功之后connectionContextRead变为true continue; } } return count; } }
privatevoidprocessOneRpc(byte[] buf) throws IOException, WrappedRpcServerException, InterruptedException { int callId = -1; int retry = RpcConstants.INVALID_RETRY_COUNT; try { final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); final RpcRequestHeaderProto header = decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis); callId = header.getCallId(); retry = header.getRetryCount(); if (LOG.isDebugEnabled()) { LOG.debug(" got #" + callId); } checkRpcHeaders(header); // 第一次连接callId是CONNECTION_CONTEXT_CALL_ID的值为 // -3,处理授权,验证通过后设置connectionContextRead的值为true if (callId < 0) { // callIds typically used during connection setup processRpcOutOfBandRequest(header, dis); } elseif (!connectionContextRead) { thrownew WrappedRpcServerException( RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection context not established"); } else { // 随后的连接处理逻辑 processRpcRequest(header, dis); } } catch (WrappedRpcServerException wrse) { // inform client of error Throwable ioe = wrse.getCause(); final Call call = new Call(callId, retry, null, this); setupResponse(authFailedResponse, call, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, ioe.getClass().getName(), ioe.getMessage()); responder.doRespond(call); throw wrse; } }
publicvoidrun(){ LOG.debug(Thread.currentThread().getName() + ": starting"); SERVER.set(Server.this); ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind); } if (!call.connection.channel.isOpen()) { LOG.info(Thread.currentThread().getName() + ": skipped " + call); continue; } String errorClass = null; String error = null; RpcStatusProto returnStatus = RpcStatusProto.SUCCESS; RpcErrorCodeProto detailedErr = null; Writable value = null;
CurCall.set(call); if (call.traceSpan != null) { traceScope = Trace.continueSpan(call.traceSpan); }
try { // Make the call as the user via Subject.doAs, thus associating // the call with the Subject if (call.connection.user == null) { // 对call对象进行处理 value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); } else { value = call.connection.user.doAs (new PrivilegedExceptionAction<Writable>() { @Override public Writable run()throws Exception { // make the call return call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);
} } ); } } catch (Throwable e) { ... } CurCall.set(null); synchronized (call.connection.responseQueue) { // setupResponse() needs to be sync'ed together with // responder.doResponse() since setupResponse may use // SASL to encrypt response data and SASL enforces // its own message ordering. setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error); // Discard the large buf and reset it back to smaller size // to free up heap if (buf.size() > maxRespSize) { LOG.warn("Large response size " + buf.size() + " for call " + call.toString()); buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); } // 处理完之后将call放入responseQueue中 responder.doRespond(call); } } catch (InterruptedException e) { ... } catch (Exception e) { ... } finally { if (traceScope != null) { traceScope.close(); } IOUtils.cleanup(LOG, traceScope); } } LOG.debug(Thread.currentThread().getName() + ": exiting"); }
privatevoiddoRunLoop(){ long lastPurgeTime = 0; // last check for old calls.
while (running) { try { waitPending(); // If a channel is being registered, wait. writeSelector.select(PURGE_INTERVAL); Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); try { if (key.isValid() && key.isWritable()) { // 从名字来看是异步处理,但怎么体现出来的? doAsyncWrite(key); } } catch (IOException e) { LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e); } } long now = Time.now(); if (now < lastPurgeTime + PURGE_INTERVAL) { continue; } lastPurgeTime = now; // // If there were some calls that have not been sent out for a // long time, discard them. // if(LOG.isDebugEnabled()) { LOG.debug("Checking for old call responses."); } ArrayList<Call> calls; // get the list of channels from list of keys. // 得到所有的注册键的集合 synchronized (writeSelector.keys()) { calls = new ArrayList<Call>(writeSelector.keys().size()); iter = writeSelector.keys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); Call call = (Call)key.attachment(); // 这里得到的list是什么???? 没有处理完再次进入responseQueue的call if (call != null && key.channel() == call.connection.channel) { calls.add(call); } } } for(Call call : calls) { doPurge(call, now); } } catch (OutOfMemoryError e) { // // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish // LOG.warn("Out of Memory in server select", e); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { LOG.warn("Exception in Responder", e); } } }
privatevoiddoAsyncWrite(SelectionKey key)throws IOException { Call call = (Call)key.attachment(); if (call == null) { return; } if (key.channel() != call.connection.channel) { thrownew IOException("doAsyncWrite: bad channel"); }
synchronized(call.connection.responseQueue) { if (processResponse(call.connection.responseQueue, false)) { try { key.interestOps(0); } catch (CancelledKeyException e) { /* The Listener/reader might have closed the socket. * We don't explicitly cancel the key, so not sure if this will * ever fire. * This warning could be removed. */ LOG.warn("Exception while changing ops : " + e); } } } }
// Processes one response. Returns true if there are no more pending // data for this channel. privatebooleanprocessResponse(LinkedList<Call> responseQueue, boolean inHandler)throws IOException { boolean error = true; boolean done = false; // there is more data for this channel. int numElements = 0; Call call = null; try { synchronized (responseQueue) { // // If there are no items for this channel, then we are done // numElements = responseQueue.size(); if (numElements == 0) { error = false; returntrue; // no more data for this channel. } // Extract the first call call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": responding to " + call); } // // Send as much data as we can in the non-blocking fashion // int numBytes = channelWrite(channel, call.rpcResponse); if (numBytes < 0) { returntrue; } // 如果call.rpcResponse写完了则将call.rpcResponse清空 // 如果没有写完,则再将其放入responseQueue的连头 // 当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件 if (!call.rpcResponse.hasRemaining()) { //Clear out the response buffer so it can be collected call.rpcResponse = null; call.connection.decRpcCount(); if (numElements == 1) { // last call fully processes. done = true; // no more data for this channel. } else { done = false; // more calls pending to be sent. } if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": responding to " + call + " Wrote " + numBytes + " bytes."); } } else { // // If we were unable to write the entire response out, then // insert in Selector queue. // call.connection.responseQueue.addFirst(call); if (inHandler) { // set the serve time when the response has to be sent later call.timestamp = Time.now(); incPending(); try { // Wakeup the thread blocked on select, only then can the call // to channel.register() complete. writeSelector.wakeup(); channel.register(writeSelector, SelectionKey.OP_WRITE, call); } catch (ClosedChannelException e) { //Its ok. channel might be closed else where. done = true; } finally { decPending(); } } if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + ": responding to " + call + " Wrote partial " + numBytes + " bytes."); } } error = false; // everything went off well } } finally { if (error && call != null) { LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error"); done = true; // error. no more data for this channel. closeConnection(call.connection); } } return done; }
/* Receive a response. * Because only one receiver, so no synchronization on in. */ privatevoidreceiveRpcResponse(){ if (shouldCloseConnection.get()) { return; } touch(); try { int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); checkResponse(header);
int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId);
Call call = calls.get(callId); RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value calls.remove(callId); // 将 value 值设置为call的Response call.setRpcResponse(value); // verify that length was correct // only for ProtobufEngine where len can be verified easily if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); if (totalLen != headerLen + resWrapper.getLength()) { thrownew RpcClientException( "RPC response length mismatch on rpc success"); } }else { // Rpc Request failed ... } } } catch (IOException e) { markClosed(e); } } publicsynchronizedvoidsetRpcResponse(Writable rpcResponse){ this.rpcResponse = rpcResponse; callComplete(); }