View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.ipc;
19  
20  import io.netty.bootstrap.Bootstrap;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.EventLoopGroup;
25  import io.netty.channel.epoll.EpollEventLoopGroup;
26  import io.netty.channel.epoll.EpollSocketChannel;
27  import io.netty.channel.nio.NioEventLoopGroup;
28  import io.netty.channel.socket.SocketChannel;
29  import io.netty.channel.socket.nio.NioSocketChannel;
30  import io.netty.util.HashedWheelTimer;
31  import io.netty.util.Timeout;
32  import io.netty.util.TimerTask;
33  import io.netty.util.concurrent.Future;
34  import io.netty.util.concurrent.GenericFutureListener;
35  import io.netty.util.concurrent.Promise;
36  
37  import java.io.IOException;
38  import java.net.InetSocketAddress;
39  import java.net.SocketAddress;
40  import java.nio.ByteBuffer;
41  import java.util.concurrent.ExecutionException;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.TimeoutException;
44  import java.util.concurrent.atomic.AtomicInteger;
45  
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.CellScanner;
48  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.classification.InterfaceAudience;
52  import org.apache.hadoop.hbase.security.User;
53  import org.apache.hadoop.hbase.util.JVM;
54  import org.apache.hadoop.hbase.util.Pair;
55  import org.apache.hadoop.hbase.util.PoolMap;
56  import org.apache.hadoop.hbase.util.Threads;
57  
58  import com.google.common.annotations.VisibleForTesting;
59  import com.google.protobuf.Descriptors;
60  import com.google.protobuf.Message;
61  import com.google.protobuf.RpcCallback;
62  import com.google.protobuf.RpcChannel;
63  import com.google.protobuf.RpcController;
64  
65  /**
66   * Netty client for the requests and responses
67   */
68  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
69  public class AsyncRpcClient extends AbstractRpcClient {
70  
71    public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
72    public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
73    public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";
74  
75    private static final HashedWheelTimer WHEEL_TIMER =
76        new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"),
77            100, TimeUnit.MILLISECONDS);
78  
79    private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER =
80        new ChannelInitializer<SocketChannel>() {
81      @Override
82      protected void initChannel(SocketChannel ch) throws Exception {
83        //empty initializer
84      }
85    };
86  
87    protected final AtomicInteger callIdCnt = new AtomicInteger();
88  
89    private final PoolMap<Integer, AsyncRpcChannel> connections;
90  
91    final FailedServers failedServers;
92  
93    @VisibleForTesting
94    final Bootstrap bootstrap;
95  
96    private final boolean useGlobalEventLoopGroup;
97  
98    @VisibleForTesting
99    static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;
100 
101   private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
102       getGlobalEventLoopGroup(Configuration conf) {
103     if (GLOBAL_EVENT_LOOP_GROUP == null) {
104       GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
105       if (LOG.isDebugEnabled()) {
106         LOG.debug("Create global event loop group "
107             + GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
108       }
109     }
110     return GLOBAL_EVENT_LOOP_GROUP;
111   }
112 
113   private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
114       Configuration conf) {
115     // Max amount of threads to use. 0 lets Netty decide based on amount of cores
116     int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);
117 
118     // Config to enable native transport. Does not seem to be stable at time of implementation
119     // although it is not extensively tested.
120     boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);
121 
122     // Use the faster native epoll transport mechanism on linux if enabled
123     if (epollEnabled && JVM.isLinux() && JVM.isAmd64()) {
124       if (LOG.isDebugEnabled()) {
125         LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
126       }
127       return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
128           Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
129     } else {
130       if (LOG.isDebugEnabled()) {
131         LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
132       }
133       return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
134           Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
135     }
136   }
137 
138   /**
139    * Constructor for tests
140    *
141    * @param configuration      to HBase
142    * @param clusterId          for the cluster
143    * @param localAddress       local address to connect to
144    * @param channelInitializer for custom channel handlers
145    */
146   @VisibleForTesting
147   AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
148       ChannelInitializer<SocketChannel> channelInitializer) {
149     super(configuration, clusterId, localAddress);
150 
151     if (LOG.isDebugEnabled()) {
152       LOG.debug("Starting async Hbase RPC client");
153     }
154 
155     Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
156     this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);
157     if (useGlobalEventLoopGroup) {
158       eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
159     } else {
160       eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
161     }
162     if (LOG.isDebugEnabled()) {
163       LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
164           + eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
165     }
166 
167     this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
168     this.failedServers = new FailedServers(configuration);
169 
170     int operationTimeout = configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
171         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
172 
173     // Configure the default bootstrap.
174     this.bootstrap = new Bootstrap();
175     bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
176         .channel(eventLoopGroupAndChannelClass.getSecond())
177         .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
178         .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
179         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, operationTimeout);
180     if (channelInitializer == null) {
181       channelInitializer = DEFAULT_CHANNEL_INITIALIZER;
182     }
183     bootstrap.handler(channelInitializer);
184     if (localAddress != null) {
185       bootstrap.localAddress(localAddress);
186     }
187   }
188 
189   /**
190    * Constructor
191    *
192    * @param configuration to HBase
193    * @param clusterId     for the cluster
194    * @param localAddress  local address to connect to
195    */
196   public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) {
197     this(configuration, clusterId, localAddress, null);
198   }
199 
200   /**
201    * Make a call, passing <code>param</code>, to the IPC server running at
202    * <code>address</code> which is servicing the <code>protocol</code> protocol,
203    * with the <code>ticket</code> credentials, returning the value.
204    * Throws exceptions if there are network problems or if the remote code
205    * threw an exception.
206    *
207    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
208    *               {@link org.apache.hadoop.hbase.security.UserProvider#getCurrent()} makes a new
209    *               instance of User each time so will be a new Connection each time.
210    * @return A pair with the Message response and the Cell data (if any).
211    * @throws InterruptedException if call is interrupted
212    * @throws java.io.IOException  if a connection failure is encountered
213    */
214   @Override
215   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
216       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
217       InetSocketAddress addr) throws IOException, InterruptedException {
218     if (pcrc == null) {
219       pcrc = new PayloadCarryingRpcController();
220     }
221     final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
222 
223     Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
224     long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
225     try {
226       Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
227       return new Pair<>(response, pcrc.cellScanner());
228     } catch (ExecutionException e) {
229       if (e.getCause() instanceof IOException) {
230         throw (IOException) e.getCause();
231       } else {
232         throw wrapException(addr, (Exception) e.getCause());
233       }
234     } catch (TimeoutException e) {
235       CallTimeoutException cte = new CallTimeoutException(promise.toString());
236       throw wrapException(addr, cte);
237     }
238   }
239 
240   /**
241    * Call method async
242    */
243   private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc,
244       Message param, Message returnType, User ticket, InetSocketAddress addr,
245       final RpcCallback<Message> done) {
246     final AsyncRpcChannel connection;
247     try {
248       connection = createRpcChannel(md.getService().getName(), addr, ticket);
249 
250       connection.callMethod(md, pcrc, param, returnType).addListener(
251           new GenericFutureListener<Future<Message>>() {
252         @Override
253         public void operationComplete(Future<Message> future) throws Exception {
254           if(!future.isSuccess()){
255             Throwable cause = future.cause();
256             if (cause instanceof IOException) {
257               pcrc.setFailed((IOException) cause);
258             }else{
259               pcrc.setFailed(new IOException(cause));
260             }
261           }else{
262             try {
263               done.run(future.get());
264             }catch (ExecutionException e){
265               Throwable cause = e.getCause();
266               if (cause instanceof IOException) {
267                 pcrc.setFailed((IOException) cause);
268               }else{
269                 pcrc.setFailed(new IOException(cause));
270               }
271             }catch (InterruptedException e){
272               pcrc.setFailed(new IOException(e));
273             }
274           }
275         }
276       });
277     } catch (StoppedRpcClientException|FailedServerException e) {
278       pcrc.setFailed(e);
279     }
280   }
281 
282   private boolean closed = false;
283 
284   /**
285    * Close netty
286    */
287   public void close() {
288     if (LOG.isDebugEnabled()) {
289       LOG.debug("Stopping async HBase RPC client");
290     }
291 
292     synchronized (connections) {
293       if (closed) {
294         return;
295       }
296       closed = true;
297       for (AsyncRpcChannel conn : connections.values()) {
298         conn.close(null);
299       }
300     }
301     // do not close global EventLoopGroup.
302     if (!useGlobalEventLoopGroup) {
303       bootstrap.group().shutdownGracefully();
304     }
305   }
306 
307   /**
308    * Create a cell scanner
309    *
310    * @param cellBlock to create scanner for
311    * @return CellScanner
312    * @throws java.io.IOException on error on creation cell scanner
313    */
314   public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
315     return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
316   }
317 
318   /**
319    * Build cell block
320    *
321    * @param cells to create block with
322    * @return ByteBuffer with cells
323    * @throws java.io.IOException if block creation fails
324    */
325   public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
326     return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
327   }
328 
329   /**
330    * Creates an RPC client
331    *
332    * @param serviceName    name of servicce
333    * @param location       to connect to
334    * @param ticket         for current user
335    * @return new RpcChannel
336    * @throws StoppedRpcClientException when Rpc client is stopped
337    * @throws FailedServerException if server failed
338    */
339   private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
340       User ticket) throws StoppedRpcClientException, FailedServerException {
341     // Check if server is failed
342     if (this.failedServers.isFailedServer(location)) {
343       if (LOG.isDebugEnabled()) {
344         LOG.debug("Not trying to connect to " + location +
345             " this server is in the failed servers list");
346       }
347       throw new FailedServerException(
348           "This server is in the failed servers list: " + location);
349     }
350 
351     int hashCode = ConnectionId.hashCode(ticket,serviceName,location);
352 
353     AsyncRpcChannel rpcChannel;
354     synchronized (connections) {
355       if (closed) {
356         throw new StoppedRpcClientException();
357       }
358       rpcChannel = connections.get(hashCode);
359       if (rpcChannel == null || !rpcChannel.isAlive()) {
360         rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
361         connections.put(hashCode, rpcChannel);
362       }
363     }
364 
365     return rpcChannel;
366   }
367 
368   /**
369    * Interrupt the connections to the given ip:port server. This should be called if the server
370    * is known as actually dead. This will not prevent current operation to be retried, and,
371    * depending on their own behavior, they may retry on the same server. This can be a feature,
372    * for example at startup. In any case, they're likely to get connection refused (if the
373    * process died) or no route to host: i.e. there next retries should be faster and with a
374    * safe exception.
375    *
376    * @param sn server to cancel connections for
377    */
378   @Override
379   public void cancelConnections(ServerName sn) {
380     synchronized (connections) {
381       for (AsyncRpcChannel rpcChannel : connections.values()) {
382         if (rpcChannel.isAlive() &&
383             rpcChannel.address.getPort() == sn.getPort() &&
384             rpcChannel.address.getHostName().contentEquals(sn.getHostname())) {
385           LOG.info("The server on " + sn.toString() +
386               " is dead - stopping the connection " + rpcChannel.toString());
387           rpcChannel.close(null);
388         }
389       }
390     }
391   }
392 
393   /**
394    * Remove connection from pool
395    */
396   public void removeConnection(AsyncRpcChannel connection) {
397     int connectionHashCode = connection.hashCode();
398     synchronized (connections) {
399       // we use address as cache key, so we should check here to prevent removing the
400       // wrong connection
401       AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
402       if (connectionInPool != null && connectionInPool.equals(connection)) {
403         this.connections.remove(connectionHashCode);
404       } else if (LOG.isDebugEnabled()) {
405         LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
406           connection.toString(), System.identityHashCode(connection),
407           System.identityHashCode(connectionInPool)));
408       }
409     }
410   }
411 
412   /**
413    * Creates a "channel" that can be used by a protobuf service.  Useful setting up
414    * protobuf stubs.
415    *
416    * @param sn server name describing location of server
417    * @param user which is to use the connection
418    * @param rpcTimeout default rpc operation timeout
419    *
420    * @return A rpc channel that goes via this rpc client instance.
421    * @throws IOException when channel could not be created
422    */
423   public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
424     return new RpcChannelImplementation(this, sn, user, rpcTimeout);
425   }
426 
427   /**
428    * Blocking rpc channel that goes via hbase rpc.
429    */
430   @VisibleForTesting
431   public static class RpcChannelImplementation implements RpcChannel {
432     private final InetSocketAddress isa;
433     private final AsyncRpcClient rpcClient;
434     private final User ticket;
435     private final int channelOperationTimeout;
436 
437     /**
438      * @param channelOperationTimeout - the default timeout when no timeout is given
439      */
440     protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
441         final ServerName sn, final User ticket, int channelOperationTimeout) {
442       this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
443       this.rpcClient = rpcClient;
444       this.ticket = ticket;
445       this.channelOperationTimeout = channelOperationTimeout;
446     }
447 
448     @Override
449     public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
450         Message param, Message returnType, RpcCallback<Message> done) {
451       PayloadCarryingRpcController pcrc;
452       if (controller != null) {
453         pcrc = (PayloadCarryingRpcController) controller;
454         if (!pcrc.hasCallTimeout()) {
455           pcrc.setCallTimeout(channelOperationTimeout);
456         }
457       } else {
458         pcrc = new PayloadCarryingRpcController();
459         pcrc.setCallTimeout(channelOperationTimeout);
460       }
461 
462       this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
463     }
464   }
465 
466   Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
467     return WHEEL_TIMER.newTimeout(task, delay, unit);
468   }
469 }