RPC

RPC由四个模块组成:
1、通信模块。
两个相互协作的通信模块实现请求-应答协议,它们在客户和服务器之间传递请求和应答消息,一般不会对数据包进行任何处理。请求–应答协议的实现方式有同步方式和异步方式两种。同步模式下客户端程序一直阻塞到服务器端发送的应答请求到达本地; 而异步模式不同,客户端将请求发送到服务器端后,不必等待应答返回,可以做其他事情,待服务器端处理完请求后,主动通知客户端。在高并发应用场景中,一般采用异步模式以降低访问延迟和提高带宽利用率。
2、Stub 程序(代理程序)。
客户端和服务器端均包含Stub程序,可将之看做代理程序。它使得远程函数调用表现得跟本地调用一样,对用户程序完全透明。在客户端,它表现得就像一个本地程序,但不直接执行本地调用,而是将请求信息通过网络模块发送给服务器端。此外,当服务器发送应答后,它会解码对应结果。在服务器端,Stub程序依次进行解码请求消息中的参数、调用相应的服务过程和编码应答结果的返回值等处理。
3、调度程序。
调度程序接收来自通信模块的请求消息,并根据其中的标识选择一个Stub程序进行处理。通常客户端并发请求量比较大时,会采用线程池提高处理效率。
4、客户程序/服务过程。
请求的发出者和请求的处理者。

通常而言,一个 RPC请求从发送到获取处理结果,所经历的步骤如下所示。

  1. 客户程序以本地方式通过系统产生的Stub程序调用客户程序;
  2. 该Stub程序将函数调用信息按照网络通信模块的要求封装成消息包,并交给通信模块发送到远程服务器端。
  3. 远程服务器端接收此消息后,将此消息发送给相应的Stub程序;
  4. Stub程序拆封消息,形成被调过程要求的形式,并调用对应函数;
  5. 被调用函数按照所获参数执行,并将结果返回给Stub程序;
  6. Stub程序将此结果封装成消息,通过网络通信模块逐级地传送给客户程序。

Hadoop RPC

Hadoop RPC主要分为四个部分,分别是序列化层、函数调用层、网络传输层和服务器端处理框架,具体实现机制如下:

序列化层。序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储,在RPC框架中,
它主要用于将用户请求中的参数或者应答转化成字节流以便跨机器传输。Hadoop2.0之后,
主要用Protocol Buffers和Apache Avro,Hadoop本身也提供了一套序列化框架,
一个类只要实现Writable接口即可支持对象序列化与反序列化。
函数调用层。函数调用层主要功能是定位要调用的函数并执行该函数,
Hadoop RPC采用了Java反射机制(服务器端)与动态代理(客户端)实现了函数调用。
网络传输层。网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基于TCP/IP的Socket机制
服务器端处理框架。服务器端处理框架可被抽象为网络I/O模型,它描述了客户端与服务器端间信息交互方式,
它的设计直接决定着服务器端的并发处理能力,常见的网络 I/O 模型有阻塞式 I/O、非阻塞式 I/O、事件驱动 I/O 等,而Hadoop RPC采用了基于Reactor设计模式的事件驱动 I/O 模型(NIO)

Hadoop RPC Demo

使用Hadoop RPC前要先定义一个RPC协议,这个协议其实就是一个接口类,接口中的方法就是对外提供的远程调用方法。

1
2
3
4
5
6
7
8
9
/**
* Hadoop中所有自定义 RPC 接口都需要继承 VersionedProtocol 接口
*/
public interface MyClientProtocol extends VersionedProtocol {
// 版本号,默认情况下,不同版本号的 RPC Client 和 Server 之间不能相互通信
public static final long versionID = 1L;
public String echo(String str) throws IOException;
public int add(int a, int b) throws IOException;
}

再声明一个类去实现这个协议,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyClientProtocolImpl implements MyClientProtocol {

// RPC 协议对外提供方法的具体实现
@Override
public String echo(String str) throws IOException {
return str;
}

@Override
public int add(int a, int b) throws IOException {
return a + b;
}
// 下面两个方法是从 VersionedProtocol 中重写的
@Override
public long getProtocolVersion(String s, long l) throws IOException {
return MyClientProtocol.versionID;
}

@Override
public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
return new ProtocolSignature(MyClientProtocol.versionID, null);
}
}

有了RPC协议,再创建server端和client端就可以使用了,下面通过Hadoop RPC 创建server端。

1
2
3
4
5
6
7
8
9
10
11
12
public class MyServer {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
// 使用RPC类get一个sever 默认情况下使用 WritableRpcEngine,
// 可以使用RPC.setProtocolEngine 设置序列号引擎
// server 通过 Builder模式 得到一个server对象
RPC.Server server = new RPC.Builder(conf).setProtocol(MyClientProtocol.class)
.setInstance(new MyClientProtocolImpl()).setBindAddress("127.0.0.1").setPort(9876)
.setNumHandlers(10).build();
server.start();
}
}

现在就可以在client端调用RPC协议对外提供的接口方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
final Configuration conf = new Configuration();

long startTime = System.currentTimeMillis();
for (int i=0; i<30000; i++){
Thread t = new Thread(new Runnable() {
@Override
public void run() {
MyClientProtocol proxy = null;
try {
// 从RPC.getProxy方法的名字可以猜出client其实就是一个代理
// Hadoop RPC客户端是通过java 动态代理实现远程调用的
proxy = (MyClientProtocol) RPC.getProxy(MyClientProtocol.class, MyClientProtocol.versionID,
new InetSocketAddress("127.0.0.1", 9876), conf);
int result = proxy.add(5, 6);
// System.out.println(result);
String echoResult = proxy.echo(result + ""); // just for string
// System.out.println(echoResult);
} catch (IOException e) {
e.printStackTrace();
}
}
});
t.start();
t.join();
}
System.out.println("end");
System.out.println(System.currentTimeMillis() - startTime);
}
}

如果只是想写个demo,client不用写for循环,我写这个for循环是想测试下载server端设置不同的handler数对其性能的影响,但是通过上面的代码并没有达到我想要的结果。可能是我思路不对吧,有哪位大神看见了给我指点下。。。

Hadoop RPC 源码解析

通过上面的Demo,可以看出只有RPC协议是自己实现的,server和client都是通过RPC提供的接口get到的。则具体上面的Demo来看下RPC相关的源码。
与RPC相关的类在common中的ipc中,ipc是inter-process communication 的缩写
RPC主要向外提供了一些编程接口,用于get server 和 client,是对底层客户机–服务器网络模型的封装。
client可以通过RPC提供的getProxy和waitForProxy两种方法得到,看下getProxy的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
public static <T> T getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy();
}
....
// getProtocolProxy 通过一些列的方法调用,基本都是方法的重写,最后定位到如下:
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException {
if(UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}

return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
}

Hadoop2.0之后支持protocol buffer序列化,所以在原Hadoop RPC的基础上进行了修改,提出一个RpcEngine接口,以支持第三方序列化方法,hadoop本身只实现了protocol 和 Writable序列化的engine。

1
2
3
4
5
6
7
8
9
10
11
static synchronized RpcEngine getProtocolEngine(Class<?> protocol, Configuration conf) {
RpcEngine engine = (RpcEngine)PROTOCOL_ENGINES.get(protocol);
// Hadoop 默认使用Writable序列化
if(engine == null) {
Class impl = conf.getClass("rpc.engine." + protocol.getName(), WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
PROTOCOL_ENGINES.put(protocol, engine);
}

return engine;
}

然后调用对应RpcEngine的getProxy方法,这里以WritableRPCEngine为例,

1
2
3
4
5
6
7
8
9
10
11
12
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth) throws IOException {
if(connectionRetryPolicy != null) {
throw new UnsupportedOperationException("Not supported: connectionRetryPolicy=" + connectionRetryPolicy);
} else {
//Static Object newProxyInstance(ClassLoader loader, Class[] interfaces, InvocationHandler h)
//返回代理类的一个实例,返回后的代理类可以当作被代理类使用
//(可使用被代理类的在Subject接口中声明过的方法)。
//InvocationHandler 是代理角色,具体方法的调用在这里invoke方法中
Object proxy = Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, new WritableRpcEngine.Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, fallbackToSimpleAuth));
return new ProtocolProxy(protocol, proxy, true);
}
}

Proxy实例化时传进去的InvocationHandler的实现类是WritableRpcEngine的内部类Invoker,构造方法如下:

1
2
3
4
5
public Invoker(Class<?> protocol, InetSocketAddress address, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, AtomicBoolean fallbackToSimpleAuth) throws IOException {
this.remoteId = ConnectionId.getConnectionId(address, protocol, ticket, rpcTimeout, conf);
this.client = WritableRpcEngine.CLIENTS.getClient(conf, factory);
this.fallbackToSimpleAuth = fallbackToSimpleAuth;
}

当client端通过Proxy的实例化对象调用协议的相关接口时(即demo中proxy.add(5, 6)),会调用WritableRpcEngine.Invoker中的invoke方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
ObjectWritable value;
try {
value = (ObjectWritable)this.client.call(RpcKind.RPC_WRITABLE, new WritableRpcEngine.Invocation(method, args), this.remoteId, this.fallbackToSimpleAuth);
} finally {
if(traceScope != null) {
traceScope.close();
}
}
...
return value.get();
}

这里调用了Client类的call方法,WritableRpcEngine.Invocationclient实现了Writable接口,这里的作用是将method 和 args 进行序列化。client是在Invoker的构造方法中实例化的,call的具体实现如下:

1
2
3
4
5
6
7
public Writable call(RpcKind rpcKind, Writable rpcRequest, Client.ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException {
// 将远程调用信息封装成一个Client.Call对象 每个Call都有一个唯一的callId
Client.Call call = this.createCall(rpcKind, rpcRequest);
// 根据remoteId创建一个connection对象,并将call放到该对象的hashtable calls中
Client.Connection connection = this.getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth);
...
}

在call方法中先将远程调用信息封装成一个Client.Call对象,然后通过getConnection得到connection对象,将封装好的call对象放入connection对象的hashtable calls中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Client.Connection getConnection(Client.ConnectionId remoteId, Client.Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException {
if(!this.running.get()) {
throw new IOException("The client is stopped");
} else {
Client.Connection connection;
do {
Hashtable var6 = this.connections; // connections 是个 hashtable
synchronized(this.connections) {
// 先从 connections中查找是否存在,不存在则创建
connection = (Client.Connection)this.connections.get(remoteId);
if(connection == null) {
// 这里new 只是对一些相关属性进行赋值,并没有真正的建立连接
connection = new Client.Connection(remoteId, serviceClass);
this.connections.put(remoteId, connection);
}
}
} while(!connection.addCall(call)); // 得到connection之后将call放入calls的hashtable中
// 建立连接
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
}

setupIOstreams中建立连接,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams(
AtomicBoolean fallbackToSimpleAuth) {
if (socket != null || shouldCloseConnection.get()) {
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to "+server);
}
if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connecting to " + server);
}
short numRetries = 0;
Random rand = null;
while (true) {
// 建立socket进行连接 可见client是用socket进行通信
setupConnection();
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
// Write the connection header - this is sent when connection is established
writeConnectionHeader(outStream);
if (authProtocol == AuthProtocol.SASL) {
...
}
// 发送ping message,
// The time after which a RPC will timeout.
// If ping is not enabled (via ipc.client.ping), then the timeout value is the
// same as the pingInterval.
// If ping is enabled, then there is no timeout value.
if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));

// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
}
this.out = new DataOutputStream(outStream);
// 向内容中写入消息,这里面将 connectionContextHeader的序列化协议写死了,不知道为什么??
// RpcRequestHeaderProto connectionContextHeader = ProtoUtil
// .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
// OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
// RpcConstants.INVALID_RETRY_COUNT, clientId);
// 这个方法与 writeConnectionHeader()的区别是什么 ,暂时还没有搞清。。。。。。。。
writeConnectionContext(remoteId, authMethod);

// update last activity time
touch();

if (Trace.isTracing()) {
Trace.addTimelineAnnotation("IPC client connected to " + server);
}

// start the receiver thread after the socket connection has been set
// up
start(); // 启动connection线程,等待接受server的response
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}

通过setupConnection方法进行socket连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
// 创建一个网络socket
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
this.socket.setKeepAlive(true);

/*
* Bind the socket to the host specified in the principal name of the
* client, to ensure Server matching address of the client connection
* to host name in principal passed.
*/
UserGroupInformation ticket = remoteId.getTicket();
if (ticket != null && ticket.hasKerberosCredentials()) {
KerberosInfo krbInfo =
remoteId.getProtocol().getAnnotation(KerberosInfo.class);
if (krbInfo != null && krbInfo.clientPrincipal() != null) {
String host =
SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());

// If host name is a valid local address then bind socket to it
InetAddress localAddr = NetUtils.getLocalInetAddress(host);
if (localAddr != null) {
this.socket.bind(new InetSocketAddress(localAddr, 0));
}
}
}

// server 在 connection的构造方法中进行赋值,
// this.server = remoteId.getAddress();
// 在connect中进行socket与ip的绑定连接
NetUtils.connect(this.socket, server, connectionTimeout);
if (rpcTimeout > 0) {
pingInterval = rpcTimeout; // rpcTimeout overwrites pingInterval
}
this.socket.setSoTimeout(pingInterval);
return;
} catch (ConnectTimeoutException toe) {
...
} catch (IOException ie) {
...
}
}
}

建立连接之后,启动connection线程,运行run方法,等待server端的response。代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void run() {
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());

try {
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
}
} catch (Throwable t) {
// This truly is unexpected, since we catch IOException in receiveResponse
// -- this is only to be really sure that we don't leave a client hanging
// forever.
LOG.warn("Unexpected error reading responses on connection " + this, t);
markClosed(new IOException("Error reading responses", t));
}

close();

if (LOG.isDebugEnabled())
LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size());
}

getConnection逻辑已经初步完结,现在回到call代码中,继续分析其代码,下面的关键代码是connection.sendRpcRequest(call),发送call对象到server端,并进入阻塞状态等待server的response。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public Writable call(RpcKind rpcKind, Writable rpcRequest, Client.ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException {
...
try {
// 将远程调用信息发送给server端
connection.sendRpcRequest(call);
} catch (RejectedExecutionException var13) {
throw new IOException("connection has been closed", var13);
} catch (InterruptedException var14) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", var14);
throw new IOException(var14);
}

boolean interrupted = false;
synchronized(call) {
// 判断call是否完成,等待server端notify
while(!call.done) {
try {
call.wait(); // 当前线程blocking住,等待Connection线程中receiveRpcResponse调用call.notify
} catch (InterruptedException var12) {
interrupted = true;
}
}

if(interrupted) {
Thread.currentThread().interrupt();
}

if(call.error != null) {
...
} else {
// get server 的结果
return call.getRpcResponse();
}
}
}
...
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}

// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,

// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);

synchronized (sendRpcRequestLock) {
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}

if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);

byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
});

try {
// 等待call发送完毕之后才退出该方法,回到call方法中继续执行
senderFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();

// cause should only be a RuntimeException as the Runnable above
// catches IOException
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("unexpected checked exception", cause);
}
}
}
}

接下来看下server端的代码,server是通过RPC.Builder的build方法构建出来的,来看下build代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Server build() throws IOException, HadoopIllegalArgumentException {
...
return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}
...
public RPC.Server getServer(Class<?> protocolClass,
Object protocolImpl, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
portRangeConfig);
}
...
//在 Server的构造方法中,一追追溯到ipc.Server的构造方法
protected Server(String bindAddress, int port,
Class<? extends Writable> rpcRequestClass, int handlerCount,
int numReaders, int queueSizePerHandler, Configuration conf,
String serverName, SecretManager<? extends TokenIdentifier> secretManager,
String portRangeConfig)
throws IOException {
...
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);

// Create the responder here
responder = new Responder();

if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
}

this.exceptionsHandler.addTerseExceptions(StandbyException.class);
}

在ipc.Server的构造方法中会实例化ListenerResponder两个线程,先看下Listener的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);

// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
// 创建reader数组,默认长度为1
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}

// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}

整个Server只有一个Listener线程,在Listener线程中使用Java NIO来监听client发来的request连接,初始化Reader数组readers,Reader主要用于反序列化数据,生成服务器端的Call对象。Listener注册的事件是SelectionKey.OP_ACCEPT,Reader注册的事件是SelectionKey.OP_READ。
Server中还实例化了一个Responder线程,构造方法中只是得到一个Selector对象,Listener和Reader中也各自含有一个Selector对象,用来选择自己关心的事件。

1
2
3
4
5
6
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}

至此Server初始化完毕,然后调用start方法,启动server,启动Server中responderlistener还有handers

1
2
3
4
5
6
7
8
9
10
11
public synchronized void start() {
responder.start();
listener.start();
//启动Hander线程,这个个数是初始化Server时通过setHander设置的
handlers = new Handler[handlerCount];

for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}

首先来看下listener的run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this); // ThreadLocal<Server> SERVER 线程局部变量
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
// 得到selector
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
// SelectionKey.OP_ACCEPT 发生后由doAccept来处理
doAccept(key);
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
...
}

Listener通过selector监听SelectionKey.OP_ACCEPT事件,客户端Client.call中的getConnection会触发该事件,当事件发生后调用doAccept

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {

channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(true);
// Listener 得到client的连接之后,交给Reader处理
// 从readers数组中得到一个Reader
Reader reader = getReader();
// Server 端的Connection不是一个线程,注意和Client的Connection的区别
// Connection相当于hander
Connection c = connectionManager.register(channel);
// 将当前Connection已经被建立连接,等待closeCurrentConnection关闭连接
key.attach(c); // so closeCurrentConnection can get the object
// 将Connection放入Reader.pendingConnections,等待Reader处理
reader.addConnection(c);
}
}

reader.addConnection(c)的代码是:

1
2
3
4
5
6
public void addConnection(Connection conn) throws InterruptedException {
pendingConnections.put(conn);
// 调用wakeup 是让线程从select()方法的阻塞中跳出来
// 因为pendingConnections已经加入了新的Connection
readSelector.wakeup();
}

线程从select()中跳出来之后,继续运行Reader.run(),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
public void run() {
LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
}
}
}

private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
// pendingConnections 是一个 BlockingQueue
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
// 此方法会阻塞,调用readSelector.wakeup()跳出阻塞
readSelector.select();

Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
}
} catch (IOException ex) {
LOG.error("Error in Reader", ex);
}
}
}

void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
if (c == null) {
return;
}
c.setLastContact(Time.now());

try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
// a WrappedRpcServerException is an exception that has been sent
// to the client, so the stacktrace is unnecessary; any other
// exceptions are unexpected internal server errors and thus the
// stacktrace should be logged
LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " +
c.getHostAddress() + " threw exception [" + e + "]",
(e instanceof WrappedRpcServerException) ? null : e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
closeConnection(c);
c = null;
}
else {
c.setLastContact(Time.now());
}
}

public int readAndProcess()
throws WrappedRpcServerException, IOException, InterruptedException {
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count = -1;
// this.dataLengthBuffer = ByteBuffer.allocate(4)
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
// 读取RPC header RPC header一共4部分
/**
* Write the connection header - this is sent when connection is established
* +----------------------------------+
* | "hrpc" 4 bytes | hrpc 在上面的if语句中读取
* +----------------------------------+
* | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+
* | AuthProtocol (1 byte) |
* +----------------------------------+
*/
if (!connectionHeaderRead) {
//Every connection is expected to send the header.
if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3);
}
// 将 Version、Service Class、AuthProtocol读出
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
int version = connectionHeaderBuf.get(0);
// TODO we should add handler for service class later
this.setServiceClass(connectionHeaderBuf.get(1));
dataLengthBuffer.flip();

// Check if it looks like the user is hitting an IPC port
// with an HTTP GET - this is a common error, so we can
// send back a simple string indicating as much.
if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
setupHttpRequestOnIpcPortResponse();
return -1;
}

if (!RpcConstants.HEADER.equals(dataLengthBuffer)
|| version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
setupBadVersionResponse(version);
return -1;
}

// this may switch us into SIMPLE
authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));

dataLengthBuffer.clear();
connectionHeaderBuf = null;
connectionHeaderRead = true;
// 读完rpc head之后跳出此次循环
continue;
}
// 读取rpcrequest的内容
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
checkDataLength(dataLength);
data = ByteBuffer.allocate(dataLength);
}

count = channelRead(channel, data);

if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
// 先进行授权然后进行内容的读取
processOneRpc(data.array());
data = null;
if (!isHeaderRead) {
// 读取request内容时先进行授权,授权成功之后connectionContextRead变为true
continue;
}
}
return count;
}
}

private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();
retry = header.getRetryCount();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
checkRpcHeaders(header);
// 第一次连接callId是CONNECTION_CONTEXT_CALL_ID的值为
// -3,处理授权,验证通过后设置connectionContextRead的值为true
if (callId < 0) { // callIds typically used during connection setup
processRpcOutOfBandRequest(header, dis);
} else if (!connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established");
} else {
// 随后的连接处理逻辑
processRpcRequest(header, dis);
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
final Call call = new Call(callId, retry, null, this);
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
responder.doRespond(call);
throw wrse;
}
}

private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
...
Writable rpcRequest;
try { //Read the rpc request
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
// 进行反序列化,从Writable流中读取数据
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance
...
}
...
// 实例化Server端的Call对象,然后放入callQueue中,等待handler来处理
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);

callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}

至此Server端的接受请求阶段已经处理完成,下面由各个handler区共享队列callQueue中取出call进行处理。Handler有多个线程组成,则看Handler的run方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public void run() {
LOG.debug(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
TraceScope traceScope = null;
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
}
if (!call.connection.channel.isOpen()) {
LOG.info(Thread.currentThread().getName() + ": skipped " + call);
continue;
}
String errorClass = null;
String error = null;
RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
RpcErrorCodeProto detailedErr = null;
Writable value = null;

CurCall.set(call);
if (call.traceSpan != null) {
traceScope = Trace.continueSpan(call.traceSpan);
}

try {
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if (call.connection.user == null) {
// 对call对象进行处理
value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest,
call.timestamp);
} else {
value =
call.connection.user.doAs
(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);

}
}
);
}
} catch (Throwable e) {
...
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with
// responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);

// Discard the large buf and reset it back to smaller size
// to free up heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call "
+ call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
// 处理完之后将call放入responseQueue中
responder.doRespond(call);
}
} catch (InterruptedException e) {
...
} catch (Exception e) {
...
} finally {
if (traceScope != null) {
traceScope.close();
}
IOUtils.cleanup(LOG, traceScope);
}
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
}

handler中处理call的代码value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp),实现是在RPC.Server.call中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
// getPpcInvoker 得到序列化Engine
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}

// WritableRpcEngine.Server中cal的实现
public Writable call(org.apache.hadoop.ipc.RPC.Server server,
String protocolName, Writable rpcRequest, long receivedTime)
throws IOException, RPC.VersionMismatch {
...
// Invoke the protocol method
long startTime = Time.now();
int qTime = (int) (startTime-receivedTime);
Exception exception = null;
try {
// 得到Method
Method method =
protocolImpl.protocolClass.getMethod(call.getMethodName(),
call.getParameterClasses());
method.setAccessible(true);
server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
// 方法的调用,由协议的实现类执行方法 反射
Object value =
method.invoke(protocolImpl.protocolImpl, call.getParameters());
if (server.verbose) log("Return: "+value);
return new ObjectWritable(method.getReturnType(), value);

} catch (InvocationTargetException e) {
...
}
...
}

call执行完之后,返回到Handler的run方法中继续执行,将call交给responder.doRespond处理,

1
2
3
4
5
6
7
8
9
10
11
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
// 将call加入到responseQueue中,每个connection都有一个responseQueue
call.connection.responseQueue.addLast(call);
// 当responseQueue的大小为1时,直接由Handler线程进行处理
// 当responseQueue的大小大于1时,由Responder线程进行处理
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
}
}

请求已经处理完成,如果responseQueue大小为1,则直接由Handler调用processResponse返回结果,如果responseQueue的大小大于1,则由Responder线程返回结果,先看下run代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
doRunLoop();
} finally {
...
}
}

private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls.

while (running) {
try {
waitPending(); // If a channel is being registered, wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
// 从名字来看是异步处理,但怎么体现出来的?
doAsyncWrite(key);
}
} catch (IOException e) {
LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = Time.now();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not been sent out for a
// long time, discard them.
//
if(LOG.isDebugEnabled()) {
LOG.debug("Checking for old call responses.");
}
ArrayList<Call> calls;

// get the list of channels from list of keys.
// 得到所有的注册键的集合
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call)key.attachment();
// 这里得到的list是什么???? 没有处理完再次进入responseQueue的call
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}

for(Call call : calls) {
doPurge(call, now);
}
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn("Out of Memory in server select", e);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
LOG.warn("Exception in Responder", e);
}
}
}

private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call)key.attachment();
if (call == null) {
return;
}
if (key.channel() != call.connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}

synchronized(call.connection.responseQueue) {
if (processResponse(call.connection.responseQueue, false)) {
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
/* The Listener/reader might have closed the socket.
* We don't explicitly cancel the key, so not sure if this will
* ever fire.
* This warning could be removed.
*/
LOG.warn("Exception while changing ops : " + e);
}
}
}
}

// Processes one response. Returns true if there are no more pending
// data for this channel.
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel.
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, then we are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
// Extract the first call
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
}
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
// 如果call.rpcResponse写完了则将call.rpcResponse清空
// 如果没有写完,则再将其放入responseQueue的连头
// 当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件
if (!call.rpcResponse.hasRemaining()) {
//Clear out the response buffer so it can be collected
call.rpcResponse = null;
call.connection.decRpcCount();
if (numElements == 1) { // last call fully processes.
done = true; // no more data for this channel.
} else {
done = false; // more calls pending to be sent.
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote " + numBytes + " bytes.");
}
} else {
//
// If we were unable to write the entire response out, then
// insert in Selector queue.
//
call.connection.responseQueue.addFirst(call);

if (inHandler) {
// set the serve time when the response has to be sent later
call.timestamp = Time.now();

incPending();
try {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
} catch (ClosedChannelException e) {
//Its ok. channel might be closed else where.
done = true;
} finally {
decPending();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + ": responding to " + call
+ " Wrote partial " + numBytes + " bytes.");
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}

Server端返回结果的过程到此结束,接下来又回到了clinet端,由Connection线程接受Server的Response。代码在Connection中run方法中的receiveRpcResponse,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
private void receiveRpcResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();

try {
int totalLen = in.readInt();
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
checkResponse(header);

int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

int callId = header.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);

Call call = calls.get(callId);
RpcStatusProto status = header.getStatus();
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
calls.remove(callId);
// 将 value 值设置为call的Response
call.setRpcResponse(value);

// verify that length was correct
// only for ProtobufEngine where len can be verified easily
if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
if (totalLen != headerLen + resWrapper.getLength()) {
throw new RpcClientException(
"RPC response length mismatch on rpc success");
}
}else { // Rpc Request failed
...
}
}
} catch (IOException e) {
markClosed(e);
}
}
public synchronized void setRpcResponse(Writable rpcResponse) {
this.rpcResponse = rpcResponse;
callComplete();
}

protected synchronized void callComplete() {
this.done = true;
// 通知在call中由connection.sendRpcRequest之后调用call.wait()的线程
notify(); // notify caller
}

notify之后再次回到Client.call的线程中执行call.getRpcResponse(),

1
2
3
public synchronized Writable getRpcResponse() {
return rpcResponse;
}

总结

代码分析完毕,用文字总结下Hadoop RPC的大致流程:

Client通过代理在invoke中调用client.call之后的流程

  1. 远程方法调用信息封装成Call对象,查看当前client与server是否已经建立connection,如果没有则创建一个Connection对象,否则直接get一个connection,将Call对象放到Connection对象中的哈希表calls中;
  2. 调用 Connection 类中的sendRpcRequest()方法将当前Call对象发送给Server端,并调用call.wait()阻塞线程;
  3. Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过Connection线程调用receiveRpcResponse()函数获取结果;
  4. Client检查结果处理状态(成功还是失败),并将对应 Call 对象从哈希表calls中删除,将结果赋值给call.rpcResponse。

Server

  1. 接收请求;该阶段主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行后续处理。
    该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至于每个Reader线程负责哪些客户端连接,完全由Listener决定,当前Listener只是采用了简单的轮询分配机制。Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ 事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听(它负责的那部分)客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue 中。

  2. 处理请求;该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。Server 端可同时存在多个Handler线程,它们并行从_共享队列_中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务交给Responder线程。

  3. 返回结果;前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder 线程采用异步方式继续发送未发送完成的结果

参考

http://jomolangma.com/?p=130