View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.EventLoopGroup;
25  import io.netty.channel.epoll.EpollEventLoopGroup;
26  import io.netty.channel.epoll.EpollSocketChannel;
27  import io.netty.channel.nio.NioEventLoopGroup;
28  import io.netty.channel.socket.SocketChannel;
29  import io.netty.channel.socket.nio.NioSocketChannel;
30  import io.netty.util.HashedWheelTimer;
31  import io.netty.util.Timeout;
32  import io.netty.util.TimerTask;
33  import io.netty.util.concurrent.Future;
34  import io.netty.util.concurrent.GenericFutureListener;
35  import io.netty.util.concurrent.Promise;
36  
37  import java.io.IOException;
38  import java.net.InetSocketAddress;
39  import java.net.SocketAddress;
40  import java.nio.ByteBuffer;
41  import java.util.concurrent.ExecutionException;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.TimeoutException;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.hbase.CellScanner;
51  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.ServerName;
54  import org.apache.hadoop.hbase.classification.InterfaceAudience;
55  import org.apache.hadoop.hbase.client.MetricsConnection;
56  import org.apache.hadoop.hbase.security.User;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
58  import org.apache.hadoop.hbase.util.JVM;
59  import org.apache.hadoop.hbase.util.Pair;
60  import org.apache.hadoop.hbase.util.PoolMap;
61  import org.apache.hadoop.hbase.util.Threads;
62  
63  import com.google.common.annotations.VisibleForTesting;
64  import com.google.protobuf.Descriptors;
65  import com.google.protobuf.Message;
66  import com.google.protobuf.RpcCallback;
67  import com.google.protobuf.RpcChannel;
68  import com.google.protobuf.RpcController;
69  
70  /**
71   * Netty client for the requests and responses
72   */
73  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
74  public class AsyncRpcClient extends AbstractRpcClient {
75  
76    private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
77  
78    public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
79    public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
80    public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
81  
82    private static final HashedWheelTimer WHEEL_TIMER =
83        new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
84            100, TimeUnit.MILLISECONDS);
85  
86    private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
87        new ChannelInitializer<SocketChannel>() {
88      @Override
89      protected void initChannel(SocketChannel ch) throws Exception {
90        //empty initializer
91      }
92    };
93  
94    protected final AtomicInteger callIdCnt = new AtomicInteger();
95  
96    private final PoolMap<Integer, AsyncRpcChannel> connections;
97  
98    final FailedServers failedServers;
99  
100   @VisibleForTesting
101   final Bootstrap bootstrap;
102 
103   private final boolean useGlobalEventLoopGroup;
104 
105   @VisibleForTesting
106   static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
107 
108   private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
109       getGlobalEventLoopGroup(Configuration conf) {
110     if (GLOBAL_EVENT_LOOP_GROUP == null) {
111       GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
112       if (LOG.isDebugEnabled()) {
113         LOG.debug("Create global event loop group "
114             + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
115       }
116     }
117     return GLOBAL_EVENT_LOOP_GROUP;
118   }
119 
120   private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
121       Configuration conf) {
122     // Max amount of threads to use. 0 lets Netty decide based on amount of cores
123     int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
124 
125     // Config to enable native transport. Does not seem to be stable at time of implementation
126     // although it is not extensively tested.
127     boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
128 
129     // Use the faster native epoll transport mechanism on linux if enabled
130     if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
131       if (LOG.isDebugEnabled()) {
132         LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
133       }
134       return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
135           Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
136     } else {
137       if (LOG.isDebugEnabled()) {
138         LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
139       }
140       return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
141           Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
142     }
143   }
144 
145   /**
146    * Constructor for tests
147    *
148    * @param configuration      to HBase
149    * @param clusterId          for the cluster
150    * @param localAddress       local address to connect to
151    * @param metrics            the connection metrics
152    * @param channelInitializer for custom channel handlers
153    */
154   protected AsyncRpcClient(Configuration configuration, String clusterId,
155       SocketAddress localAddress, MetricsConnection metrics,
156       ChannelInitializer<SocketChannel> channelInitializer) {
157     super(configuration, clusterId, localAddress, metrics);
158 
159     if (LOG.isDebugEnabled()) {
160       LOG.debug("Starting async Hbase RPC client");
161     }
162 
163     Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
164     this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
165     if (useGlobalEventLoopGroup) {
166       eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
167     } else {
168       eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
169     }
170     if (LOG.isDebugEnabled()) {
171       LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
172           + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
173     }
174 
175     this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
176     this.failedServers = new FailedServers(configuration);
177 
178     int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
179         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
180 
181     // Configure the default bootstrap.
182     this.bootstrap = new Bootstrap();
183     bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
184         .channel(eventLoopGroupAndChannelClass.getSecond())
185         .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
186         .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
187         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
188     if (channelInitializer == null) {
189       channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
190     }
191     bootstrap.handler(channelInitializer);
192     if (localAddress != null) {
193       bootstrap.localAddress(localAddress);
194     }
195   }
196 
197   /** Used in test only. */
198   AsyncRpcClient(Configuration configuration) {
199     this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
200   }
201 
202   /** Used in test only. */
203   AsyncRpcClient(Configuration configuration,
204       ChannelInitializer<SocketChannel> channelInitializer) {
205     this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
206   }
207 
208   /**
209    * Constructor
210    *
211    * @param configuration to HBase
212    * @param clusterId     for the cluster
213    * @param localAddress  local address to connect to
214    * @param metrics       the connection metrics
215    */
216   public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
217       MetricsConnection metrics) {
218     this(configuration, clusterId, localAddress, metrics, null);
219   }
220 
221   /**
222    * Make a call, passing <code>param</code>, to the IPC server running at
223    * <code>address</code> which is servicing the <code>protocol</code> protocol,
224    * with the <code>ticket</code> credentials, returning the value.
225    * Throws exceptions if there are network problems or if the remote code
226    * threw an exception.
227    *
228    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
229    *               {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new
230    *               instance of User each time so will be a new Connection each time.
231    * @return A pair with the Message response and the Cell data (if any).
232    * @throws InterruptedException if call is interrupted
233    * @throws java.io.IOException  if a connection failure is encountered
234    */
235   @Override
236   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
237       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
238       InetSocketAddress addr, MetricsConnection.CallStats callStats)
239       throws IOException, InterruptedException {
240     if (pcrc == null) {
241       pcrc = new PayloadCarryingRpcController();
242     }
243     final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
244 
245     Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
246     long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
247     try {
248       Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
249       return new Pair<>(response, pcrc.cellScanner());
250     } catch (ExecutionException e) {
251       if (e.getCause() instanceof IOException) {
252         throw (IOException) e.getCause();
253       } else {
254         throw wrapException(addr, (Exception) e.getCause());
255       }
256     } catch (TimeoutException e) {
257       CallTimeoutException cte = new CallTimeoutException(promise.toString());
258       throw wrapException(addr, cte);
259     }
260   }
261 
262   /**
263    * Call method async
264    */
265   private void callMethod(final Descriptors.MethodDescriptor md,
266       final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
267       InetSocketAddress addr, final RpcCallback<Message> done) {
268     final AsyncRpcChannel connection;
269     try {
270       connection = createRpcChannel(md.getService().getName(), addr, ticket);
271       final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
272       GenericFutureListener<Future<Message>> listener =
273           new GenericFutureListener<Future<Message>>() {
274             @Override
275             public void operationComplete(Future<Message> future) throws Exception {
276               cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
277               if (metrics != null) {
278                 metrics.updateRpc(md, param, cs);
279               }
280               if (LOG.isTraceEnabled()) {
281                 LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
282               }
283               if (!future.isSuccess()) {
284                 Throwable cause = future.cause();
285                 if (cause instanceof IOException) {
286                   pcrc.setFailed((IOException) cause);
287                 } else {
288                   pcrc.setFailed(new IOException(cause));
289                 }
290               } else {
291                 try {
292                   done.run(future.get());
293                 } catch (ExecutionException e) {
294                   Throwable cause = e.getCause();
295                   if (cause instanceof IOException) {
296                     pcrc.setFailed((IOException) cause);
297                   } else {
298                     pcrc.setFailed(new IOException(cause));
299                   }
300                 } catch (InterruptedException e) {
301                   pcrc.setFailed(new IOException(e));
302                 }
303               }
304             }
305           };
306       cs.setStartTime(EnvironmentEdgeManager.currentTime());
307       connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
308     } catch (StoppedRpcClientException|FailedServerException e) {
309       pcrc.setFailed(e);
310     }
311   }
312 
313   private boolean closed = false;
314 
315   /**
316    * Close netty
317    */
318   public void close() {
319     if (LOG.isDebugEnabled()) {
320       LOG.debug("Stopping async HBase RPC client");
321     }
322 
323     synchronized (connections) {
324       if (closed) {
325         return;
326       }
327       closed = true;
328       for (AsyncRpcChannel conn : connections.values()) {
329         conn.close(null);
330       }
331     }
332     // do not close global EventLoopGroup.
333     if (!useGlobalEventLoopGroup) {
334       bootstrap.group().shutdownGracefully();
335     }
336   }
337 
338   /**
339    * Create a cell scanner
340    *
341    * @param cellBlock to create scanner for
342    * @return CellScanner
343    * @throws java.io.IOException on error on creation cell scanner
344    */
345   public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
346     return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
347   }
348 
349   /**
350    * Build cell block
351    *
352    * @param cells to create block with
353    * @return ByteBuffer with cells
354    * @throws java.io.IOException if block creation fails
355    */
356   public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
357     return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
358   }
359 
360   /**
361    * Creates an RPC client
362    *
363    * @param serviceName    name of servicce
364    * @param location       to connect to
365    * @param ticket         for current user
366    * @return new RpcChannel
367    * @throws StoppedRpcClientException when Rpc client is stopped
368    * @throws FailedServerException if server failed
369    */
370   private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
371       User ticket) throws StoppedRpcClientException, FailedServerException {
372     // Check if server is failed
373     if (this.failedServers.isFailedServer(location)) {
374       if (LOG.isDebugEnabled()) {
375         LOG.debug("Not trying to connect to " + location +
376             " this server is in the failed servers list");
377       }
378       throw new FailedServerException(
379           "This server is in the failed servers list: " + location);
380     }
381 
382     int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
383 
384     AsyncRpcChannel rpcChannel;
385     synchronized (connections) {
386       if (closed) {
387         throw new StoppedRpcClientException();
388       }
389       rpcChannel = connections.get(hashCode);
390       if (rpcChannel == null || !rpcChannel.isAlive()) {
391         rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
392         connections.put(hashCode, rpcChannel);
393       }
394     }
395 
396     return rpcChannel;
397   }
398 
399   /**
400    * Interrupt the connections to the given ip:port server. This should be called if the server
401    * is known as actually dead. This will not prevent current operation to be retried, and,
402    * depending on their own behavior, they may retry on the same server. This can be a feature,
403    * for example at startup. In any case, they're likely to get connection refused (if the
404    * process died) or no route to host: i.e. there next retries should be faster and with a
405    * safe exception.
406    *
407    * @param sn server to cancel connections for
408    */
409   @Override
410   public void cancelConnections(ServerName sn) {
411     synchronized (connections) {
412       for (AsyncRpcChannel rpcChannel : connections.values()) {
413         if (rpcChannel.isAlive() &&
414             rpcChannel.address.getPort() == sn.getPort() &&
415             rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
416           LOG.info("The server on " + sn.toString() +
417               " is dead - stopping the connection " + rpcChannel.toString());
418           rpcChannel.close(null);
419         }
420       }
421     }
422   }
423 
424   /**
425    * Remove connection from pool
426    */
427   public void removeConnection(AsyncRpcChannel connection) {
428     int connectionHashCode = connection.hashCode();
429     synchronized (connections) {
430       // we use address as cache key, so we should check here to prevent removing the
431       // wrong connection
432       AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
433       if (connectionInPool != null && connectionInPool.equals(connection)) {
434         this.connections.remove(connectionHashCode);
435       } else if (LOG.isDebugEnabled()) {
436         LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
437           connection.toString(), System.identityHashCode(connection),
438           System.identityHashCode(connectionInPool)));
439       }
440     }
441   }
442 
443   /**
444    * Creates a "channel" that can be used by a protobuf service.  Useful setting up
445    * protobuf stubs.
446    *
447    * @param sn server name describing location of server
448    * @param user which is to use the connection
449    * @param rpcTimeout default rpc operation timeout
450    *
451    * @return A rpc channel that goes via this rpc client instance.
452    */
453   public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
454     return new RpcChannelImplementation(this, sn, user, rpcTimeout);
455   }
456 
457   /**
458    * Blocking rpc channel that goes via hbase rpc.
459    */
460   @VisibleForTesting
461   public static class RpcChannelImplementation implements RpcChannel {
462     private final InetSocketAddress isa;
463     private final AsyncRpcClient rpcClient;
464     private final User ticket;
465     private final int channelOperationTimeout;
466 
467     /**
468      * @param channelOperationTimeout - the default timeout when no timeout is given
469      */
470     protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
471         final ServerName sn, final User ticket, int channelOperationTimeout) {
472       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
473       this.rpcClient = rpcClient;
474       this.ticket = ticket;
475       this.channelOperationTimeout = channelOperationTimeout;
476     }
477 
478     @Override
479     public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
480         Message param, Message returnType, RpcCallback<Message> done) {
481       PayloadCarryingRpcController pcrc;
482       if (controller != null) {
483         pcrc = (PayloadCarryingRpcController) controller;
484         if (!pcrc.hasCallTimeout()) {
485           pcrc.setCallTimeout(channelOperationTimeout);
486         }
487       } else {
488         pcrc = new PayloadCarryingRpcController();
489         pcrc.setCallTimeout(channelOperationTimeout);
490       }
491 
492       this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
493     }
494   }
495 
496   Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
497     return WHEEL_TIMER.newTimeout(task, delay, unit);
498   }
499 }