1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.epoll.EpollEventLoopGroup;
26 import io.netty.channel.epoll.EpollSocketChannel;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.socket.SocketChannel;
29 import io.netty.channel.socket.nio.NioSocketChannel;
30 import io.netty.util.HashedWheelTimer;
31 import io.netty.util.Timeout;
32 import io.netty.util.TimerTask;
33 import io.netty.util.concurrent.Future;
34 import io.netty.util.concurrent.GenericFutureListener;
35 import io.netty.util.concurrent.Promise;
36
37 import java.io.IOException;
38 import java.net.InetSocketAddress;
39 import java.net.SocketAddress;
40 import java.nio.ByteBuffer;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.hbase.CellScanner;
48 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.ServerName;
51 import org.apache.hadoop.hbase.classification.InterfaceAudience;
52 import org.apache.hadoop.hbase.security.User;
53 import org.apache.hadoop.hbase.util.JVM;
54 import org.apache.hadoop.hbase.util.Pair;
55 import org.apache.hadoop.hbase.util.PoolMap;
56 import org.apache.hadoop.hbase.util.Threads;
57
58 import com.google.common.annotations.VisibleForTesting;
59 import com.google.protobuf.Descriptors;
60 import com.google.protobuf.Message;
61 import com.google.protobuf.RpcCallback;
62 import com.google.protobuf.RpcChannel;
63 import com.google.protobuf.RpcController;
64
65
66
67
68 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
69 public class AsyncRpcClient extends AbstractRpcClient {
70
71 public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
72 public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
73 public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
74
75 private static final HashedWheelTimer WHEEL_TIMER =
76 new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
77 100, TimeUnit.MILLISECONDS);
78
79 private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
80 new ChannelInitializer<SocketChannel>() {
81 @Override
82 protected void initChannel(SocketChannel ch) throws Exception {
83
84 }
85 };
86
87 protected final AtomicInteger callIdCnt = new AtomicInteger();
88
89 private final PoolMap<Integer, AsyncRpcChannel> connections;
90
91 final FailedServers failedServers;
92
93 @VisibleForTesting
94 final Bootstrap bootstrap;
95
96 private final boolean useGlobalEventLoopGroup;
97
98 @VisibleForTesting
99 static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
100
101 private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
102 getGlobalEventLoopGroup(Configuration conf) {
103 if (GLOBAL_EVENT_LOOP_GROUP == null) {
104 GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("Create global event loop group "
107 + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
108 }
109 }
110 return GLOBAL_EVENT_LOOP_GROUP;
111 }
112
113 private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
114 Configuration conf) {
115
116 int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
117
118
119
120 boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
121
122
123 if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
126 }
127 return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
128 Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
129 } else {
130 if (LOG.isDebugEnabled()) {
131 LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
132 }
133 return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
134 Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
135 }
136 }
137
138
139
140
141
142
143
144
145
146 @VisibleForTesting
147 AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
148 ChannelInitializer<SocketChannel> channelInitializer) {
149 super(configuration, clusterId, localAddress);
150
151 if (LOG.isDebugEnabled()) {
152 LOG.debug("Starting async Hbase RPC client");
153 }
154
155 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
156 this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
157 if (useGlobalEventLoopGroup) {
158 eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
159 } else {
160 eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
161 }
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
164 + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
165 }
166
167 this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
168 this.failedServers = new FailedServers(configuration);
169
170 int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
171 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
172
173
174 this.bootstrap = new Bootstrap();
175 bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
176 .channel(eventLoopGroupAndChannelClass.getSecond())
177 .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
178 .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
179 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
180 if (channelInitializer == null) {
181 channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
182 }
183 bootstrap.handler(channelInitializer);
184 if (localAddress != null) {
185 bootstrap.localAddress(localAddress);
186 }
187 }
188
189
190
191
192
193
194
195
196 public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) {
197 this(configuration, clusterId, localAddress, null);
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 @Override
215 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
216 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
217 InetSocketAddress addr) throws IOException, InterruptedException {
218 if (pcrc == null) {
219 pcrc = new PayloadCarryingRpcController();
220 }
221 final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
222
223 Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
224 long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
225 try {
226 Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
227 return new Pair<>(response, pcrc.cellScanner());
228 } catch (ExecutionException e) {
229 if (e.getCause() instanceof IOException) {
230 throw (IOException) e.getCause();
231 } else {
232 throw wrapException(addr, (Exception) e.getCause());
233 }
234 } catch (TimeoutException e) {
235 CallTimeoutException cte = new CallTimeoutException(promise.toString());
236 throw wrapException(addr, cte);
237 }
238 }
239
240
241
242
243 private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc,
244 Message param, Message returnType, User ticket, InetSocketAddress addr,
245 final RpcCallback<Message> done) {
246 final AsyncRpcChannel connection;
247 try {
248 connection = createRpcChannel(md.getService().getName(), addr, ticket);
249
250 connection.callMethod(md, pcrc, param, returnType).addListener(
251 new GenericFutureListener<Future<Message>>() {
252 @Override
253 public void operationComplete(Future<Message> future) throws Exception {
254 if(!future.isSuccess()){
255 Throwable cause = future.cause();
256 if (cause instanceof IOException) {
257 pcrc.setFailed((IOException) cause);
258 }else{
259 pcrc.setFailed(new IOException(cause));
260 }
261 }else{
262 try {
263 done.run(future.get());
264 }catch (ExecutionException e){
265 Throwable cause = e.getCause();
266 if (cause instanceof IOException) {
267 pcrc.setFailed((IOException) cause);
268 }else{
269 pcrc.setFailed(new IOException(cause));
270 }
271 }catch (InterruptedException e){
272 pcrc.setFailed(new IOException(e));
273 }
274 }
275 }
276 });
277 } catch (StoppedRpcClientException|FailedServerException e) {
278 pcrc.setFailed(e);
279 }
280 }
281
282 private boolean closed = false;
283
284
285
286
287 public void close() {
288 if (LOG.isDebugEnabled()) {
289 LOG.debug("Stopping async HBase RPC client");
290 }
291
292 synchronized (connections) {
293 if (closed) {
294 return;
295 }
296 closed = true;
297 for (AsyncRpcChannel conn : connections.values()) {
298 conn.close(null);
299 }
300 }
301
302 if (!useGlobalEventLoopGroup) {
303 bootstrap.group().shutdownGracefully();
304 }
305 }
306
307
308
309
310
311
312
313
314 public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
315 return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
316 }
317
318
319
320
321
322
323
324
325 public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
326 return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
327 }
328
329
330
331
332
333
334
335
336
337
338
339 private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
340 User ticket) throws StoppedRpcClientException, FailedServerException {
341
342 if (this.failedServers.isFailedServer(location)) {
343 if (LOG.isDebugEnabled()) {
344 LOG.debug("Not trying to connect to " + location +
345 " this server is in the failed servers list");
346 }
347 throw new FailedServerException(
348 "This server is in the failed servers list: " + location);
349 }
350
351 int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
352
353 AsyncRpcChannel rpcChannel;
354 synchronized (connections) {
355 if (closed) {
356 throw new StoppedRpcClientException();
357 }
358 rpcChannel = connections.get(hashCode);
359 if (rpcChannel == null || !rpcChannel.isAlive()) {
360 rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
361 connections.put(hashCode, rpcChannel);
362 }
363 }
364
365 return rpcChannel;
366 }
367
368
369
370
371
372
373
374
375
376
377
378 @Override
379 public void cancelConnections(ServerName sn) {
380 synchronized (connections) {
381 for (AsyncRpcChannel rpcChannel : connections.values()) {
382 if (rpcChannel.isAlive() &&
383 rpcChannel.address.getPort() == sn.getPort() &&
384 rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
385 LOG.info("The server on " + sn.toString() +
386 " is dead - stopping the connection " + rpcChannel.toString());
387 rpcChannel.close(null);
388 }
389 }
390 }
391 }
392
393
394
395
396 public void removeConnection(AsyncRpcChannel connection) {
397 int connectionHashCode = connection.hashCode();
398 synchronized (connections) {
399
400
401 AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
402 if (connectionInPool != null && connectionInPool.equals(connection)) {
403 this.connections.remove(connectionHashCode);
404 } else if (LOG.isDebugEnabled()) {
405 LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
406 connection.toString(), System.identityHashCode(connection),
407 System.identityHashCode(connectionInPool)));
408 }
409 }
410 }
411
412
413
414
415
416
417
418
419
420
421
422
423 public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
424 return new RpcChannelImplementation(this, sn, user, rpcTimeout);
425 }
426
427
428
429
430 @VisibleForTesting
431 public static class RpcChannelImplementation implements RpcChannel {
432 private final InetSocketAddress isa;
433 private final AsyncRpcClient rpcClient;
434 private final User ticket;
435 private final int channelOperationTimeout;
436
437
438
439
440 protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
441 final ServerName sn, final User ticket, int channelOperationTimeout) {
442 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
443 this.rpcClient = rpcClient;
444 this.ticket = ticket;
445 this.channelOperationTimeout = channelOperationTimeout;
446 }
447
448 @Override
449 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
450 Message param, Message returnType, RpcCallback<Message> done) {
451 PayloadCarryingRpcController pcrc;
452 if (controller != null) {
453 pcrc = (PayloadCarryingRpcController) controller;
454 if (!pcrc.hasCallTimeout()) {
455 pcrc.setCallTimeout(channelOperationTimeout);
456 }
457 } else {
458 pcrc = new PayloadCarryingRpcController();
459 pcrc.setCallTimeout(channelOperationTimeout);
460 }
461
462 this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
463 }
464 }
465
466 Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
467 return WHEEL_TIMER.newTimeout(task, delay, unit);
468 }
469 }