1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.EventLoopGroup;
25 import io.netty.channel.epoll.EpollEventLoopGroup;
26 import io.netty.channel.epoll.EpollSocketChannel;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.socket.SocketChannel;
29 import io.netty.channel.socket.nio.NioSocketChannel;
30 import io.netty.util.HashedWheelTimer;
31 import io.netty.util.Timeout;
32 import io.netty.util.TimerTask;
33 import io.netty.util.concurrent.Future;
34 import io.netty.util.concurrent.GenericFutureListener;
35 import io.netty.util.concurrent.Promise;
36
37 import java.io.IOException;
38 import java.net.InetSocketAddress;
39 import java.net.SocketAddress;
40 import java.nio.ByteBuffer;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.concurrent.atomic.AtomicInteger;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48
49 import org.apache.hadoop.conf.Configuration;
50 import org.apache.hadoop.hbase.CellScanner;
51 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.ServerName;
54 import org.apache.hadoop.hbase.classification.InterfaceAudience;
55 import org.apache.hadoop.hbase.client.MetricsConnection;
56 import org.apache.hadoop.hbase.security.User;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58 import org.apache.hadoop.hbase.util.JVM;
59 import org.apache.hadoop.hbase.util.Pair;
60 import org.apache.hadoop.hbase.util.PoolMap;
61 import org.apache.hadoop.hbase.util.Threads;
62
63 import com.google.common.annotations.VisibleForTesting;
64 import com.google.protobuf.Descriptors;
65 import com.google.protobuf.Message;
66 import com.google.protobuf.RpcCallback;
67 import com.google.protobuf.RpcChannel;
68 import com.google.protobuf.RpcController;
69
70
71
72
73 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
74 public class AsyncRpcClient extends AbstractRpcClient {
75
76 private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
77
78 public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
79 public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
80 public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
81
82 private static final HashedWheelTimer WHEEL_TIMER =
83 new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
84 100, TimeUnit.MILLISECONDS);
85
86 private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
87 new ChannelInitializer<SocketChannel>() {
88 @Override
89 protected void initChannel(SocketChannel ch) throws Exception {
90
91 }
92 };
93
94 protected final AtomicInteger callIdCnt = new AtomicInteger();
95
96 private final PoolMap<Integer, AsyncRpcChannel> connections;
97
98 final FailedServers failedServers;
99
100 @VisibleForTesting
101 final Bootstrap bootstrap;
102
103 private final boolean useGlobalEventLoopGroup;
104
105 @VisibleForTesting
106 static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
107
108 private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
109 getGlobalEventLoopGroup(Configuration conf) {
110 if (GLOBAL_EVENT_LOOP_GROUP == null) {
111 GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("Create global event loop group "
114 + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
115 }
116 }
117 return GLOBAL_EVENT_LOOP_GROUP;
118 }
119
120 private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
121 Configuration conf) {
122
123 int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
124
125
126
127 boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
128
129
130 if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
133 }
134 return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
135 Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
136 } else {
137 if (LOG.isDebugEnabled()) {
138 LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
139 }
140 return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
141 Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
142 }
143 }
144
145
146
147
148
149
150
151
152
153
154 protected AsyncRpcClient(Configuration configuration, String clusterId,
155 SocketAddress localAddress, MetricsConnection metrics,
156 ChannelInitializer<SocketChannel> channelInitializer) {
157 super(configuration, clusterId, localAddress, metrics);
158
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("Starting async Hbase RPC client");
161 }
162
163 Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
164 this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
165 if (useGlobalEventLoopGroup) {
166 eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
167 } else {
168 eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
169 }
170 if (LOG.isDebugEnabled()) {
171 LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
172 + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
173 }
174
175 this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
176 this.failedServers = new FailedServers(configuration);
177
178 int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
179 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
180
181
182 this.bootstrap = new Bootstrap();
183 bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
184 .channel(eventLoopGroupAndChannelClass.getSecond())
185 .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
186 .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
187 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
188 if (channelInitializer == null) {
189 channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
190 }
191 bootstrap.handler(channelInitializer);
192 if (localAddress != null) {
193 bootstrap.localAddress(localAddress);
194 }
195 }
196
197
198 AsyncRpcClient(Configuration configuration) {
199 this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
200 }
201
202
203 AsyncRpcClient(Configuration configuration,
204 ChannelInitializer<SocketChannel> channelInitializer) {
205 this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
206 }
207
208
209
210
211
212
213
214
215
216 public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
217 MetricsConnection metrics) {
218 this(configuration, clusterId, localAddress, metrics, null);
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 @Override
236 protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
237 Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
238 InetSocketAddress addr, MetricsConnection.CallStats callStats)
239 throws IOException, InterruptedException {
240 if (pcrc == null) {
241 pcrc = new PayloadCarryingRpcController();
242 }
243 final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
244
245 Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
246 long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
247 try {
248 Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
249 return new Pair<>(response, pcrc.cellScanner());
250 } catch (ExecutionException e) {
251 if (e.getCause() instanceof IOException) {
252 throw (IOException) e.getCause();
253 } else {
254 throw wrapException(addr, (Exception) e.getCause());
255 }
256 } catch (TimeoutException e) {
257 CallTimeoutException cte = new CallTimeoutException(promise.toString());
258 throw wrapException(addr, cte);
259 }
260 }
261
262
263
264
265 private void callMethod(final Descriptors.MethodDescriptor md,
266 final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
267 InetSocketAddress addr, final RpcCallback<Message> done) {
268 final AsyncRpcChannel connection;
269 try {
270 connection = createRpcChannel(md.getService().getName(), addr, ticket);
271 final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
272 GenericFutureListener<Future<Message>> listener =
273 new GenericFutureListener<Future<Message>>() {
274 @Override
275 public void operationComplete(Future<Message> future) throws Exception {
276 cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
277 if (metrics != null) {
278 metrics.updateRpc(md, param, cs);
279 }
280 if (LOG.isTraceEnabled()) {
281 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
282 }
283 if (!future.isSuccess()) {
284 Throwable cause = future.cause();
285 if (cause instanceof IOException) {
286 pcrc.setFailed((IOException) cause);
287 } else {
288 pcrc.setFailed(new IOException(cause));
289 }
290 } else {
291 try {
292 done.run(future.get());
293 } catch (ExecutionException e) {
294 Throwable cause = e.getCause();
295 if (cause instanceof IOException) {
296 pcrc.setFailed((IOException) cause);
297 } else {
298 pcrc.setFailed(new IOException(cause));
299 }
300 } catch (InterruptedException e) {
301 pcrc.setFailed(new IOException(e));
302 }
303 }
304 }
305 };
306 cs.setStartTime(EnvironmentEdgeManager.currentTime());
307 connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
308 } catch (StoppedRpcClientException|FailedServerException e) {
309 pcrc.setFailed(e);
310 }
311 }
312
313 private boolean closed = false;
314
315
316
317
318 public void close() {
319 if (LOG.isDebugEnabled()) {
320 LOG.debug("Stopping async HBase RPC client");
321 }
322
323 synchronized (connections) {
324 if (closed) {
325 return;
326 }
327 closed = true;
328 for (AsyncRpcChannel conn : connections.values()) {
329 conn.close(null);
330 }
331 }
332
333 if (!useGlobalEventLoopGroup) {
334 bootstrap.group().shutdownGracefully();
335 }
336 }
337
338
339
340
341
342
343
344
345 public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
346 return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
347 }
348
349
350
351
352
353
354
355
356 public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
357 return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
358 }
359
360
361
362
363
364
365
366
367
368
369
370 private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
371 User ticket) throws StoppedRpcClientException, FailedServerException {
372
373 if (this.failedServers.isFailedServer(location)) {
374 if (LOG.isDebugEnabled()) {
375 LOG.debug("Not trying to connect to " + location +
376 " this server is in the failed servers list");
377 }
378 throw new FailedServerException(
379 "This server is in the failed servers list: " + location);
380 }
381
382 int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
383
384 AsyncRpcChannel rpcChannel;
385 synchronized (connections) {
386 if (closed) {
387 throw new StoppedRpcClientException();
388 }
389 rpcChannel = connections.get(hashCode);
390 if (rpcChannel == null || !rpcChannel.isAlive()) {
391 rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
392 connections.put(hashCode, rpcChannel);
393 }
394 }
395
396 return rpcChannel;
397 }
398
399
400
401
402
403
404
405
406
407
408
409 @Override
410 public void cancelConnections(ServerName sn) {
411 synchronized (connections) {
412 for (AsyncRpcChannel rpcChannel : connections.values()) {
413 if (rpcChannel.isAlive() &&
414 rpcChannel.address.getPort() == sn.getPort() &&
415 rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
416 LOG.info("The server on " + sn.toString() +
417 " is dead - stopping the connection " + rpcChannel.toString());
418 rpcChannel.close(null);
419 }
420 }
421 }
422 }
423
424
425
426
427 public void removeConnection(AsyncRpcChannel connection) {
428 int connectionHashCode = connection.hashCode();
429 synchronized (connections) {
430
431
432 AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
433 if (connectionInPool != null && connectionInPool.equals(connection)) {
434 this.connections.remove(connectionHashCode);
435 } else if (LOG.isDebugEnabled()) {
436 LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
437 connection.toString(), System.identityHashCode(connection),
438 System.identityHashCode(connectionInPool)));
439 }
440 }
441 }
442
443
444
445
446
447
448
449
450
451
452
453 public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
454 return new RpcChannelImplementation(this, sn, user, rpcTimeout);
455 }
456
457
458
459
460 @VisibleForTesting
461 public static class RpcChannelImplementation implements RpcChannel {
462 private final InetSocketAddress isa;
463 private final AsyncRpcClient rpcClient;
464 private final User ticket;
465 private final int channelOperationTimeout;
466
467
468
469
470 protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
471 final ServerName sn, final User ticket, int channelOperationTimeout) {
472 this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
473 this.rpcClient = rpcClient;
474 this.ticket = ticket;
475 this.channelOperationTimeout = channelOperationTimeout;
476 }
477
478 @Override
479 public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
480 Message param, Message returnType, RpcCallback<Message> done) {
481 PayloadCarryingRpcController pcrc;
482 if (controller != null) {
483 pcrc = (PayloadCarryingRpcController) controller;
484 if (!pcrc.hasCallTimeout()) {
485 pcrc.setCallTimeout(channelOperationTimeout);
486 }
487 } else {
488 pcrc = new PayloadCarryingRpcController();
489 pcrc.setCallTimeout(channelOperationTimeout);
490 }
491
492 this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
493 }
494 }
495
496 Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
497 return WHEEL_TIMER.newTimeout(task, delay, unit);
498 }
499 }